import gradio as gr import time import json import os import subprocess from datetime import datetime, timedelta from typing import List, Tuple, Optional from backend import ( ALLOWED_LANGS, AUDIO_FORMATS, transcription_manager, allowed_file, User ) from ai_summary import ai_summary_manager def format_status(status): """Convert status to user-friendly format""" status_map = { 'pending': '⏳ Queued', 'processing': '🔄 Processing', 'completed': '✅ Done', 'failed': '❌ Failed' } return status_map.get(status, status) def format_processing_time(created_at, completed_at=None): """Calculate and format processing time""" try: start_time = datetime.fromisoformat(created_at) if completed_at: end_time = datetime.fromisoformat(completed_at) duration = end_time - start_time else: duration = datetime.now() - start_time total_seconds = int(duration.total_seconds()) if total_seconds < 60: return f"{total_seconds}s" elif total_seconds < 3600: minutes = total_seconds // 60 seconds = total_seconds % 60 return f"{minutes}m {seconds}s" else: hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 return f"{hours}h {minutes}m" except: return "Unknown" def get_user_stats_display(user: User): """Get comprehensive user statistics for display""" if not user: return "👤 Please log in to view statistics" try: # Get transcript stats transcript_stats = transcription_manager.get_user_stats(user.user_id) # Get AI summary stats summary_stats = transcription_manager.get_user_summary_stats(user.user_id) total_transcripts = transcript_stats.get('total_jobs', 0) total_summaries = summary_stats.get('total_jobs', 0) stats_text = f"👤 {user.username} | 🎙️ Transcripts: {total_transcripts} | 🤖 AI Summaries: {total_summaries}" # Add processing status processing_transcripts = transcript_stats.get('by_status', {}).get('processing', 0) processing_summaries = summary_stats.get('by_status', {}).get('processing', 0) if processing_transcripts > 0: stats_text += f" | 🔄 Transcribing: {processing_transcripts}" if processing_summaries > 0: stats_text += f" | 🔄 Summarizing: {processing_summaries}" return stats_text except Exception as e: return f"👤 {user.username} | Stats error: {str(e)}" # Authentication Functions (same as before) def register_user(email, username, password, confirm_password, gdpr_consent, data_retention_consent, marketing_consent): """Register new user account""" try: print(f"📝 Registration attempt for: {username} ({email})") # Validate inputs if not email or not username or not password: return "❌ All fields are required", gr.update(visible=False) if password != confirm_password: return "❌ Passwords do not match", gr.update(visible=False) if not gdpr_consent: return "❌ GDPR consent is required to create an account", gr.update(visible=False) if not data_retention_consent: return "❌ Data retention agreement is required", gr.update(visible=False) # Attempt registration success, message, user_id = transcription_manager.register_user( email, username, password, gdpr_consent, data_retention_consent, marketing_consent ) print(f"📝 Registration result: success={success}, message={message}") if success: print(f"✅ User registered successfully: {username}") return f"✅ {message}! Please log in with your credentials.", gr.update(visible=True) else: print(f"❌ Registration failed: {message}") return f"❌ {message}", gr.update(visible=False) except Exception as e: print(f"❌ Registration error: {str(e)}") return f"❌ Registration error: {str(e)}", gr.update(visible=False) def login_user(login, password): """Login user""" try: print(f"🔐 Login attempt for: {login}") if not login or not password: return "❌ Please enter both username/email and password", None, gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." success, message, user = transcription_manager.login_user(login, password) print(f"🔐 Login result: success={success}, message={message}") if success and user: print(f"✅ User logged in successfully: {user.username}") stats_display = get_user_stats_display(user) return f"✅ Welcome back, {user.username}!", user, gr.update(visible=False), gr.update(visible=True), stats_display else: print(f"❌ Login failed: {message}") return f"❌ {message}", None, gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." except Exception as e: print(f"❌ Login error: {str(e)}") return f"❌ Login error: {str(e)}", None, gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." def logout_user(): """Logout user""" print("👋 User logged out") return None, "👋 You have been logged out. Please log in to continue.", gr.update(visible=True), gr.update(visible=False), "👤 Please log in to view your statistics..." # Transcription Functions def submit_transcription(file, language, audio_format, diarization_enabled, speakers, profanity, punctuation, timestamps, lexical, user): """Submit transcription job - requires authenticated user""" if not user: return ( "❌ Please log in to submit transcriptions", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) if file is None: return ( "Please upload an audio or video file first.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) try: # Get file data try: if isinstance(file, str): if os.path.exists(file): with open(file, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file) else: return ( "File not found. Please try uploading again.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) else: file_path = str(file) if os.path.exists(file_path): with open(file_path, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file_path) else: return ( "Unable to process file. Please try again.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) except Exception as e: return ( f"Error reading file: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Validate file file_extension = original_filename.split('.')[-1].lower() if '.' in original_filename else "" supported_extensions = set(AUDIO_FORMATS) | { 'mp4', 'mov', 'avi', 'mkv', 'webm', 'm4a', '3gp', 'f4v', 'wmv', 'asf', 'rm', 'rmvb', 'flv', 'mpg', 'mpeg', 'mts', 'vob' } if file_extension not in supported_extensions and file_extension != "": return ( f"Unsupported file format: .{file_extension}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Basic file size check if len(file_bytes) > 500 * 1024 * 1024: # 500MB limit return ( "File too large. Please upload files smaller than 500MB.", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Prepare settings settings = { 'audio_format': audio_format, 'diarization_enabled': diarization_enabled, 'speakers': speakers, 'profanity': profanity, 'punctuation': punctuation, 'timestamps': timestamps, 'lexical': lexical } # Submit job job_id = transcription_manager.submit_transcription( file_bytes, original_filename, user.user_id, language, settings ) # Update job state job_state = { 'current_job_id': job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'pending' } # Get updated user stats stats_display = get_user_stats_display(user) return ( f"🚀 Transcription started for: {original_filename}\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Job ID: {job_id}", job_state, gr.update(visible=True, value="🔄 Auto-refresh active"), stats_display ) except Exception as e: print(f"❌ Error submitting transcription: {str(e)}") return ( f"Error: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) def check_current_job_status(job_state, user): """Check status of current job with improved transcript handling""" if not user: return ( "❌ Please log in to check status", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) if not job_state or 'current_job_id' not in job_state: return ( "No active job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if not job or job.user_id != user.user_id: return ( "Job not found or access denied", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Calculate processing time processing_time = format_processing_time(job.created_at, job.completed_at) # Enhanced status change logging last_status = job_state.get('last_status', '') if job.status != last_status: print(f"🔄 [{user.username}] Job status change: {last_status} → {job.status} ({job.original_filename})") job_state['last_status'] = job.status # Get updated user stats stats_display = get_user_stats_display(user) # Handle completed status with better transcript detection if job.status == 'completed' and job.transcript_text and job.transcript_text.strip(): # Job is complete and transcript is available, stop auto-refresh job_state['auto_refresh_active'] = False # Create downloadable file try: transcript_file = create_transcript_file(job.transcript_text, job_id) print(f"✅ [{user.username}] Transcription ready: {len(job.transcript_text)} characters") except Exception as e: print(f"⚠️ [{user.username}] Error creating transcript file: {str(e)}") transcript_file = None return ( f"✅ Transcription completed in {processing_time}", job.transcript_text, gr.update(visible=True, value=transcript_file) if transcript_file else gr.update(visible=False), f"Processed: {job.original_filename}", gr.update(visible=False), # Hide auto-refresh status stats_display ) elif job.status == 'failed': # Job failed, stop auto-refresh job_state['auto_refresh_active'] = False error_msg = job.error_message[:100] + "..." if job.error_message and len(job.error_message) > 100 else job.error_message or "Unknown error" return ( f"❌ Transcription failed after {processing_time}", "", gr.update(visible=False), f"Error: {error_msg}", gr.update(visible=False), stats_display ) elif job.status == 'processing': # Still processing, continue auto-refresh auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"🔄 Processing... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Converting and analyzing: {job.original_filename}", gr.update(visible=True, value="🔄 Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) elif job.status == 'completed' and (not job.transcript_text or not job.transcript_text.strip()): # Job marked as completed but transcript not yet available - keep refreshing auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"🔄 Finalizing transcript... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Retrieving results: {job.original_filename}", gr.update(visible=True, value="🔄 Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) else: # pending # Still pending, continue auto-refresh auto_refresh_active = job_state.get('auto_refresh_active', False) return ( f"⏳ Queued for processing... ({processing_time} waiting)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Waiting: {job.original_filename}", gr.update(visible=True, value="🔄 Auto-refresh active") if auto_refresh_active else gr.update(visible=False), stats_display ) except Exception as e: print(f"❌ Error checking job status: {str(e)}") return ( f"Error checking status: {str(e)}", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # AI Summary Functions def get_available_transcripts(user): """Get list of available transcripts for AI summarization""" if not user: return gr.update(choices=[], value=[]) try: # Get completed transcripts completed_jobs = transcription_manager.get_user_history(user.user_id, limit=50) completed_transcripts = [ job for job in completed_jobs if job.status == 'completed' and job.transcript_text ] # Create choices list choices = [] for job in completed_transcripts[:20]: # Limit to recent 20 label = f"{job.original_filename} ({job.created_at[:16]})" choices.append((label, job.job_id)) return gr.update(choices=choices, value=[]) except Exception as e: print(f"❌ Error getting available transcripts: {str(e)}") return gr.update(choices=[], value=[]) def submit_ai_summary_enhanced(existing_transcripts, new_audio_video_file, document_image_files, ai_instructions, summary_format, output_language, focus_areas, include_timestamps, include_action_items, user): """Enhanced AI summary submission with immediate transcript processing""" if not user: return ( "❌ Please log in to generate AI summaries", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # Determine content type and validate inputs has_existing_transcripts = existing_transcripts and len(existing_transcripts) > 0 has_new_audio_video = new_audio_video_file is not None has_document_images = document_image_files and len(document_image_files) > 0 if not has_existing_transcripts and not has_new_audio_video and not has_document_images: return ( "❌ Please provide content: select existing transcripts, upload audio/video file, or upload documents/images", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) if not ai_instructions.strip(): return ( "❌ Please provide AI instructions for the summary", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) try: # Handle new audio/video file - submit for transcription but don't create AI job yet transcription_job_id = None if has_new_audio_video: # Submit audio/video file to transcription service first try: # Read the uploaded file if isinstance(new_audio_video_file, str): file_path = new_audio_video_file else: file_path = str(new_audio_video_file) with open(file_path, 'rb') as f: file_bytes = f.read() original_filename = os.path.basename(file_path) # Use default transcription settings optimized for AI Summary transcription_settings = { 'audio_format': 'wav', 'diarization_enabled': True, 'speakers': 5, # Allow more speakers for conferences 'profanity': 'masked', 'punctuation': 'automatic', 'timestamps': True, 'lexical': False } # Submit to transcription service with Thai as default language transcription_job_id = transcription_manager.submit_transcription( file_bytes, original_filename, user.user_id, "th-TH", # Default to Thai transcription_settings ) print(f"🎙️ [{user.username}] Audio/video submitted for transcription: {transcription_job_id[:8]}...") # Create a special job state that will trigger AI summary when transcription completes summary_job_state = { 'waiting_for_transcription': True, 'transcription_job_id': transcription_job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'waiting_for_transcription', 'ai_instructions': ai_instructions, 'summary_format': summary_format, 'output_language': output_language, 'focus_areas': focus_areas, 'include_timestamps': include_timestamps, 'include_action_items': include_action_items, 'existing_transcripts': existing_transcripts if existing_transcripts else [], 'document_image_files': document_image_files if document_image_files else [], 'user_id': user.user_id } # Get updated user stats stats_display = get_user_stats_display(user) return ( f"🎙️ Audio/video submitted for transcription\n⏳ AI Summary will start automatically when transcription completes\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Transcription Job: {transcription_job_id[:8]}... → Will auto-trigger AI Summary", summary_job_state, gr.update(visible=True, value="🔄 Waiting for transcription"), stats_display ) except Exception as e: print(f"❌ Error submitting audio/video for transcription: {str(e)}") return ( f"❌ Error processing audio/video file: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) # For existing transcripts or documents only (no audio/video transcription needed) else: transcript_ids = existing_transcripts if existing_transcripts else [] document_files = document_image_files if document_image_files else [] # Determine content mode if has_existing_transcripts and not has_document_images: content_mode = "Existing Transcripts" elif has_document_images and not has_existing_transcripts: content_mode = "Text Documents" else: content_mode = "Mixed Content" # Prepare settings settings = { 'content_mode': content_mode, 'format': summary_format, 'output_language': output_language, 'focus_areas': focus_areas, 'include_timestamps': include_timestamps, 'include_action_items': include_action_items, 'language': "th-TH" } # Submit AI summary job immediately (no transcription needed) job_id = ai_summary_manager.submit_summary_job_enhanced( user_id=user.user_id, content_mode=content_mode, summary_type=summary_format, user_prompt=ai_instructions, existing_transcript_ids=transcript_ids, audio_video_files=[], document_files=document_files, settings=settings ) # Update job state summary_job_state = { 'current_summary_job_id': job_id, 'start_time': datetime.now().isoformat(), 'auto_refresh_active': True, 'last_status': 'pending' } # Get updated user stats stats_display = get_user_stats_display(user) # Create source description source_parts = [] if has_existing_transcripts: source_parts.append(f"{len(transcript_ids)} existing transcripts") if has_document_images: source_parts.append(f"{len(document_files)} document/image files") source_info = " + ".join(source_parts) return ( f"🤖 AI Summary started with {source_info}\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"AI Job ID: {job_id}", summary_job_state, gr.update(visible=True, value="🔄 AI Auto-refresh active"), stats_display ) except Exception as e: print(f"❌ Error submitting enhanced AI summary: {str(e)}") return ( f"❌ Error: {str(e)}", "", gr.update(visible=False), "", {}, gr.update(visible=False), gr.update() ) def check_ai_summary_status(summary_job_state, user): """Check status of AI summary job with auto-trigger logic for completed transcriptions""" if not user: return ( "❌ Please log in to check AI summary status", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) if not summary_job_state: return ( "No active AI summary job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) try: # Handle special case: waiting for transcription to complete if summary_job_state.get('waiting_for_transcription'): transcription_job_id = summary_job_state.get('transcription_job_id') if not transcription_job_id: return ( "❌ Error: Missing transcription job ID", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Check transcription status transcription_job = transcription_manager.get_job_status(transcription_job_id) if not transcription_job: return ( "❌ Transcription job not found", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) processing_time = format_processing_time(summary_job_state['start_time']) if transcription_job.status == 'pending': return ( f"⏳ Transcription queued... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Transcription: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 Waiting for transcription"), get_user_stats_display(user) ) elif transcription_job.status == 'processing': transcription_time = format_processing_time(transcription_job.created_at) return ( f"🎙️ Transcribing... ({transcription_time} transcribing, {processing_time} total)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Transcribing: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 Transcription in progress"), get_user_stats_display(user) ) elif transcription_job.status == 'failed': summary_job_state['auto_refresh_active'] = False return ( f"❌ Transcription failed - Cannot proceed\nError: {transcription_job.error_message or 'Unknown error'}", "", gr.update(visible=False), f"Failed: {transcription_job.original_filename}", gr.update(visible=False), get_user_stats_display(user) ) elif transcription_job.status == 'completed': if not transcription_job.transcript_text or not transcription_job.transcript_text.strip(): return ( f"🔄 Transcription completed, retrieving text... ({processing_time} elapsed)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Getting transcript: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 Getting transcript"), get_user_stats_display(user) ) # TRANSCRIPTION IS COMPLETE! NOW TRIGGER AI SUMMARY IMMEDIATELY print(f"✅ Transcription completed, triggering AI summary immediately...") try: # Prepare transcript IDs including the newly completed one transcript_ids = summary_job_state.get('existing_transcripts', []) transcript_ids.append(transcription_job_id) # Prepare settings settings = { 'content_mode': "New Audio/Video Files", 'format': summary_job_state.get('summary_format', 'บทสรุปผู้บริหาร'), 'output_language': summary_job_state.get('output_language', 'Thai'), 'focus_areas': summary_job_state.get('focus_areas', ''), 'include_timestamps': summary_job_state.get('include_timestamps', True), 'include_action_items': summary_job_state.get('include_action_items', True), 'language': "th-TH" } # Submit AI summary job NOW with completed transcript job_id = ai_summary_manager.submit_summary_job_enhanced( user_id=summary_job_state['user_id'], content_mode="New Audio/Video Files", summary_type=summary_job_state.get('summary_format', 'บทสรุปผู้บริหาร'), user_prompt=summary_job_state.get('ai_instructions', ''), existing_transcript_ids=transcript_ids, audio_video_files=[], document_files=summary_job_state.get('document_image_files', []), settings=settings ) print(f"🤖 AI Summary job created immediately: {job_id}") # Update job state to track AI summary instead of transcription summary_job_state.update({ 'waiting_for_transcription': False, 'current_summary_job_id': job_id, 'transcription_completed_at': datetime.now().isoformat(), 'last_status': 'ai_started' }) return ( f"✅ Transcription done! 🤖 AI Summary started immediately\n📊 Using transcript: {len(transcription_job.transcript_text):,} characters\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"AI Processing: {transcription_job.original_filename}", gr.update(visible=True, value="🔄 AI Summary active"), get_user_stats_display(user) ) except Exception as e: print(f"❌ Error triggering AI summary: {str(e)}") return ( f"❌ Transcription completed but AI summary failed to start: {str(e)}", "", gr.update(visible=False), "AI Summary creation failed", gr.update(visible=False), get_user_stats_display(user) ) # Normal AI summary job monitoring if 'current_summary_job_id' not in summary_job_state: return ( "No active AI summary job", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) job_id = summary_job_state['current_summary_job_id'] job = ai_summary_manager.get_summary_status(job_id) if not job or job.user_id != user.user_id: return ( "AI summary job not found or access denied", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) # Calculate processing time processing_time = format_processing_time(job.created_at, job.completed_at) # Enhanced status change logging last_status = summary_job_state.get('last_status', '') if job.status != last_status: print(f"🔄 [{user.username}] AI Summary status: {last_status} → {job.status}") summary_job_state['last_status'] = job.status # Get updated user stats stats_display = get_user_stats_display(user) # Handle completed status if job.status == 'completed' and job.summary_text and job.summary_text.strip(): # Job is complete, stop auto-refresh summary_job_state['auto_refresh_active'] = False # Create downloadable file try: summary_file = create_summary_file(job.summary_text, job_id) print(f"✅ [{user.username}] AI Summary ready: {len(job.summary_text)} characters") except Exception as e: print(f"⚠️ [{user.username}] Error creating summary file: {str(e)}") summary_file = None total_time = format_processing_time(summary_job_state['start_time']) return ( f"✅ AI Summary completed! Total time: {total_time}\n📊 Generated: {len(job.summary_text):,} characters", job.summary_text, gr.update(visible=True, value=summary_file) if summary_file else gr.update(visible=False), f"Completed: {', '.join(job.original_files)}", gr.update(visible=False), stats_display ) elif job.status == 'failed': # Job failed, stop auto-refresh summary_job_state['auto_refresh_active'] = False error_msg = job.error_message[:100] + "..." if job.error_message else "Unknown error" total_time = format_processing_time(summary_job_state['start_time']) return ( f"❌ AI Summary failed after {total_time}", "", gr.update(visible=False), f"Error: {error_msg}", gr.update(visible=False), stats_display ) elif job.status == 'processing': # Still processing, continue auto-refresh return ( f"🤖 AI analyzing and generating summary... ({processing_time} AI processing)\n📊 Creating comprehensive analysis\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"AI Processing: {', '.join(job.original_files[:2])}{'...' if len(job.original_files) > 2 else ''}", gr.update(visible=True, value="🔄 AI generating summary"), stats_display ) else: # pending # Still pending, continue auto-refresh return ( f"⏳ AI Summary queued... ({processing_time} waiting)\n📡 Auto-refreshing every 10 seconds...", "", gr.update(visible=False), f"Queued: {', '.join(job.original_files[:2])}{'...' if len(job.original_files) > 2 else ''}", gr.update(visible=True, value="🔄 AI queued"), stats_display ) except Exception as e: print(f"❌ Error checking AI summary status: {str(e)}") return ( f"Error checking AI summary status: {str(e)}", "", gr.update(visible=False), "", gr.update(visible=False), gr.update() ) def should_auto_refresh(job_state, user): """Check if auto-refresh should be active""" if not user or not job_state or not job_state.get('auto_refresh_active', False): return False if 'current_job_id' not in job_state: return False job_id = job_state['current_job_id'] try: job = transcription_manager.get_job_status(job_id) if not job or job.user_id != user.user_id: return False if job.status == 'failed': return False elif job.status == 'completed': if job.transcript_text and job.transcript_text.strip(): return False else: return True else: return True except Exception as e: print(f"❌ Error in should_auto_refresh: {str(e)}") return True def should_auto_refresh_summary(summary_job_state, user): """Check if AI summary auto-refresh should be active""" if not user or not summary_job_state or not summary_job_state.get('auto_refresh_active', False): return False if 'current_summary_job_id' not in summary_job_state: return False job_id = summary_job_state['current_summary_job_id'] try: job = ai_summary_manager.get_summary_status(job_id) if not job or job.user_id != user.user_id: return False if job.status in ['failed', 'completed']: return False else: return True except Exception as e: print(f"❌ Error in should_auto_refresh_summary: {str(e)}") return True def auto_refresh_status(job_state, user): """Auto-refresh function for transcription""" if not user: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) if should_auto_refresh(job_state, user): return check_current_job_status(job_state, user) else: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) def auto_refresh_ai_summary(summary_job_state, user): """Auto-refresh function for AI summary""" if not user: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) if should_auto_refresh_summary(summary_job_state, user): return check_ai_summary_status(summary_job_state, user) else: return ( gr.update(), gr.update(), gr.update(), gr.update(), gr.update(visible=False), gr.update() ) # History Functions def get_transcription_history_table(user, show_all=False): """Get transcription history table""" if not user: return [] try: limit = 100 if show_all else 20 transcript_jobs = transcription_manager.get_user_history(user.user_id, limit=limit) table_data = [] for job in transcript_jobs: try: created_time = datetime.fromisoformat(job.created_at) formatted_date = created_time.strftime("%Y-%m-%d %H:%M") except: formatted_date = job.created_at[:16] status_display = format_status(job.status) time_display = format_processing_time(job.created_at, job.completed_at) job_id_display = job.job_id[:8] + "..." if len(job.job_id) > 8 else job.job_id language_display = ALLOWED_LANGS.get(job.language, job.language) if job.status == 'completed' and job.transcript_text: download_status = "Available" else: download_status = status_display table_data.append([ formatted_date, job.original_filename, language_display, status_display, time_display, job_id_display, download_status ]) return table_data except Exception as e: print(f"❌ Error loading transcription history: {str(e)}") return [] def get_ai_summary_history_table(user, show_all=False): """Get AI summary history table""" if not user: return [] try: limit = 100 if show_all else 20 summary_jobs = ai_summary_manager.get_user_summary_history(user.user_id, limit=limit) table_data = [] for job in summary_jobs: try: created_time = datetime.fromisoformat(job.created_at) formatted_date = created_time.strftime("%Y-%m-%d %H:%M") except: formatted_date = job.created_at[:16] status_display = format_status(job.status) time_display = format_processing_time(job.created_at, job.completed_at) job_id_display = job.job_id[:8] + "..." if len(job.job_id) > 8 else job.job_id # Get source summary source_summary = f"{len(job.original_files)} sources" if len(job.original_files) <= 2: source_summary = ", ".join([f[:20] + "..." if len(f) > 20 else f for f in job.original_files]) if job.status == 'completed' and job.summary_text: download_status = "Available" else: download_status = status_display table_data.append([ formatted_date, source_summary, job.settings.get('output_language', 'Thai') if job.settings else 'Thai', status_display, time_display, job_id_display, download_status ]) return table_data except Exception as e: print(f"❌ Error loading AI summary history: {str(e)}") return [] def refresh_transcription_history(user, show_all=False): """Refresh transcription history and create download files""" if not user: return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) try: table_data = get_transcription_history_table(user, show_all) stats_display = get_user_stats_display(user) # Get completed transcription jobs for downloads completed_jobs = transcription_manager.get_user_history(user.user_id, limit=50) completed_transcripts = [ job for job in completed_jobs if job.status == 'completed' and job.transcript_text ] # Create download files download_updates = [] for i in range(5): if i < len(completed_transcripts): job = completed_transcripts[i] try: file_path = create_transcript_file(job.transcript_text, job.job_id) label = f"📄 {job.original_filename} ({job.created_at[:10]})" download_updates.append(gr.update(visible=True, value=file_path, label=label)) except Exception as e: print(f"Error creating transcript download: {e}") download_updates.append(gr.update(visible=False)) else: download_updates.append(gr.update(visible=False)) return [table_data, stats_display] + download_updates except Exception as e: print(f"❌ Error refreshing transcription history: {str(e)}") return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) def refresh_ai_summary_history(user, show_all=False): """Refresh AI summary history and create download files""" if not user: return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) try: table_data = get_ai_summary_history_table(user, show_all) stats_display = get_user_stats_display(user) # Get completed AI summary jobs for downloads completed_jobs = ai_summary_manager.get_user_summary_history(user.user_id, limit=50) completed_summaries = [ job for job in completed_jobs if job.status == 'completed' and job.summary_text ] # Create download files download_updates = [] for i in range(5): if i < len(completed_summaries): job = completed_summaries[i] try: file_path = create_summary_file(job.summary_text, job.job_id) source_name = job.original_files[0][:30] if job.original_files else "AI Summary" label = f"🤖 {source_name} ({job.created_at[:10]})" download_updates.append(gr.update(visible=True, value=file_path, label=label)) except Exception as e: print(f"Error creating summary download: {e}") download_updates.append(gr.update(visible=False)) else: download_updates.append(gr.update(visible=False)) return [table_data, stats_display] + download_updates except Exception as e: print(f"❌ Error refreshing AI summary history: {str(e)}") return [], gr.update(), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) # PDPA Compliance Functions (same as before, but updated for summaries too) def export_user_data(user): """Export comprehensive user data including summaries""" if not user: return "❌ Please log in to export your data", gr.update(visible=False) try: # Export transcript data transcript_export = transcription_manager.export_user_data(user.user_id) # Export AI summary data (if available) try: summary_export = { 'ai_summaries': [job.__dict__ for job in ai_summary_manager.get_user_summary_history(user.user_id, limit=1000)], 'summary_stats': transcription_manager.get_user_summary_stats(user.user_id) } except: summary_export = {'ai_summaries': [], 'summary_stats': {}} # Combine exports combined_export = { **transcript_export, **summary_export, 'export_type': 'comprehensive_azure_ai_service', 'services': ['transcription', 'ai_summarization'] } # Create export file os.makedirs("temp", exist_ok=True) filename = f"temp/user_data_export_{user.user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(combined_export, f, indent=2, ensure_ascii=False, default=str) print(f"📦 [{user.username}] Comprehensive data export created") return "✅ Your complete data (transcripts + AI summaries) has been exported successfully", gr.update(visible=True, value=filename, label="Download Your Complete Data Export") except Exception as e: print(f"❌ Error exporting comprehensive user data: {str(e)}") return f"❌ Export failed: {str(e)}", gr.update(visible=False) def update_marketing_consent(user, marketing_consent): """Update user's marketing consent""" if not user: return "❌ Please log in to update consent" try: success = transcription_manager.update_user_consent(user.user_id, marketing_consent) if success: user.marketing_consent = marketing_consent print(f"📧 [{user.username}] Marketing consent updated: {marketing_consent}") return f"✅ Marketing consent updated successfully" else: return "❌ Failed to update consent" except Exception as e: return f"❌ Error: {str(e)}" def delete_user_account(user, confirmation_text): """Delete user account and all data (transcripts + summaries)""" if not user: return "❌ Please log in to delete account", None, gr.update(visible=True), gr.update(visible=False) if confirmation_text != "DELETE MY ACCOUNT": return "❌ Please type 'DELETE MY ACCOUNT' to confirm", user, gr.update(visible=False), gr.update(visible=True) try: # Delete transcript data success = transcription_manager.delete_user_account(user.user_id) # Delete AI summary data (if backend supports it) try: transcription_manager.delete_user_summary_data(user.user_id) except Exception as e: print(f"⚠️ Warning: Could not delete AI summary data: {e}") if success: print(f"🗑️ [{user.username}] Complete account deleted (transcripts + AI summaries)") return "✅ Your account and all data (transcripts + AI summaries) have been permanently deleted", None, gr.update(visible=True), gr.update(visible=False) else: return "❌ Failed to delete account", user, gr.update(visible=False), gr.update(visible=True) except Exception as e: return f"❌ Error: {str(e)}", user, gr.update(visible=False), gr.update(visible=True) def on_user_login(user): """Update UI components when user logs in""" if user: return gr.update(value=user.marketing_consent) else: return gr.update(value=False) def create_transcript_file(transcript_text, job_id): """Create a downloadable transcript file""" os.makedirs("temp", exist_ok=True) filename = f"temp/transcript_{job_id}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(transcript_text) return filename def create_summary_file(summary_text, job_id): """Create a downloadable AI summary file""" os.makedirs("temp", exist_ok=True) filename = f"temp/ai_summary_{job_id}.txt" with open(filename, "w", encoding="utf-8") as f: f.write(summary_text) return filename # Enhanced CSS with AI Summary styling enhanced_css = """ /* Main container styling */ .gradio-container { background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%); font-family: 'Segoe UI', system-ui, sans-serif; color: #212529; } /* Enhanced header styling */ .main-header { background: linear-gradient(135deg, #007bff, #0056b3); color: white; border: none; border-radius: 12px; padding: 30px; text-align: center; margin-bottom: 20px; box-shadow: 0 4px 12px rgba(0,123,255,0.3); } .main-header h1 { color: white; margin-bottom: 10px; font-size: 2.5em; font-weight: 700; text-shadow: 0 2px 4px rgba(0,0,0,0.3); } .main-header p { color: rgba(255,255,255,0.9); font-size: 1.2em; margin: 0; } /* Card styling with enhanced AI theme */ .gr-box { background: white; border: 1px solid #dee2e6; border-radius: 12px; box-shadow: 0 4px 12px rgba(0,0,0,0.08); padding: 25px; margin: 10px 0; transition: all 0.3s ease; } .gr-box:hover { box-shadow: 0 6px 16px rgba(0,0,0,0.12); transform: translateY(-2px); } /* AI-specific card styling */ .ai-summary-card { background: linear-gradient(135deg, #f8f9ff, #e8f2ff); border: 2px solid #007bff; border-radius: 12px; padding: 25px; margin: 15px 0; } /* Button styling with AI enhancements */ .gr-button { background: linear-gradient(135deg, #007bff, #0056b3); border: none; border-radius: 8px; color: white; font-weight: 600; padding: 14px 28px; transition: all 0.3s ease; box-shadow: 0 3px 6px rgba(0,123,255,0.3); text-transform: uppercase; letter-spacing: 0.5px; } .gr-button:hover { background: linear-gradient(135deg, #0056b3, #004085); transform: translateY(-2px); box-shadow: 0 5px 10px rgba(0,123,255,0.4); } /* AI Summary specific buttons */ .ai-button { background: linear-gradient(135deg, #28a745, #1e7e34); } .ai-button:hover { background: linear-gradient(135deg, #1e7e34, #155724); } /* Status displays with enhanced styling */ .status-display { background: linear-gradient(135deg, #e3f2fd, #bbdefb); border-left: 4px solid #2196f3; padding: 20px; border-radius: 0 12px 12px 0; margin: 15px 0; font-family: 'Monaco', 'Consolas', monospace; font-size: 14px; line-height: 1.6; } .ai-status-display { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border-left: 4px solid #28a745; } /* Auto-refresh indicators */ .auto-refresh-indicator { background: linear-gradient(135deg, #fff3cd, #ffeaa7); border: 2px solid #ffc107; border-radius: 8px; padding: 10px 16px; font-size: 12px; color: #856404; text-align: center; animation: pulse 2s infinite; font-weight: 600; } @keyframes pulse { 0%, 100% { opacity: 1; box-shadow: 0 0 0 0 rgba(255, 193, 7, 0.4); } 50% { opacity: 0.8; box-shadow: 0 0 0 10px rgba(255, 193, 7, 0); } } /* User stats with enhanced design */ .user-stats { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border: 2px solid #28a745; border-radius: 8px; padding: 12px 16px; font-size: 13px; color: #155724; text-align: center; font-weight: 600; box-shadow: 0 2px 8px rgba(40,167,69,0.2); } /* Enhanced input styling */ .gr-textbox, .gr-dropdown, .gr-file { border: 2px solid #e9ecef; border-radius: 10px; background: white; color: #212529; transition: all 0.3s ease; padding: 12px; font-size: 14px; } .gr-textbox:focus, .gr-dropdown:focus { border-color: #007bff; box-shadow: 0 0 0 4px rgba(0,123,255,0.1); background: #f8f9ff; } /* Tab styling with AI theme */ .tab-nav { background: white; border-bottom: 3px solid #007bff; border-radius: 12px 12px 0 0; box-shadow: 0 2px 8px rgba(0,0,0,0.1); } .tab-nav .tab-button { padding: 15px 25px; font-weight: 600; transition: all 0.3s ease; } .tab-nav .tab-button.selected { background: linear-gradient(135deg, #007bff, #0056b3); color: white; } /* History table with enhanced design and black text */ .history-table { background: white; border: 2px solid #dee2e6; border-radius: 12px; font-size: 14px; overflow: hidden; color: #000000; /* Force black text */ } .history-table thead th { background: linear-gradient(135deg, #343a40, #495057); color: white; font-weight: 700; padding: 16px 12px; text-align: center; border: none; } .history-table tbody tr { transition: all 0.2s ease; border-bottom: 1px solid #dee2e6; color: #000000; /* Force black text for rows */ } .history-table tbody tr:hover { background: linear-gradient(135deg, #f8f9ff, #e8f2ff); transform: scale(1.01); color: #000000; /* Ensure text stays black on hover */ } .history-table tbody td { padding: 12px; vertical-align: middle; text-align: center; border-right: 1px solid #dee2e6; color: #000000 !important; /* Force black text with !important */ font-weight: 500; } .history-table tbody td:last-child { border-right: none; } /* AI Summary specific elements */ .ai-content-sources { background: linear-gradient(135deg, #fff8e1, #ffecb3); border: 2px solid #ffa000; border-radius: 12px; padding: 20px; margin: 15px 0; } .ai-instructions { background: linear-gradient(135deg, #e8f5e8, #c8e6c9); border: 2px solid #4caf50; border-radius: 12px; padding: 20px; margin: 15px 0; } .ai-results { background: linear-gradient(135deg, #f3e5f5, #e1bee7); border: 2px solid #9c27b0; border-radius: 12px; padding: 20px; margin: 15px 0; } /* Enhanced file upload areas */ .file-upload-area { border: 3px dashed #007bff; border-radius: 12px; padding: 30px; text-align: center; background: linear-gradient(135deg, #f8f9ff, #e8f2ff); transition: all 0.3s ease; } .file-upload-area:hover { border-color: #0056b3; background: linear-gradient(135deg, #e8f2ff, #d3e8ff); } /* Progress indicators */ .progress-indicator { background: linear-gradient(90deg, #007bff, #28a745, #ffc107); height: 4px; border-radius: 2px; animation: progress 2s linear infinite; } @keyframes progress { 0% { width: 0%; } 50% { width: 70%; } 100% { width: 100%; } } /* Enhanced tooltips and help text */ .help-text { color: #6c757d; font-style: italic; font-size: 12px; margin-top: 5px; } /* Responsive design improvements */ @media (max-width: 768px) { .main-header h1 { font-size: 2em; } .gr-button { padding: 10px 20px; font-size: 14px; } .gr-box { padding: 15px; margin: 5px 0; } } """ # Create the main interface with gr.Blocks( theme=gr.themes.Soft( primary_hue="blue", secondary_hue="green", neutral_hue="gray", font=["system-ui", "sans-serif"] ), css=enhanced_css, title="🎙️🤖 Azure-Powered AI Conference Service - Advanced Transcription & Intelligent Summarization" ) as demo: # Global state current_user = gr.State(None) job_state = gr.State({}) summary_job_state = gr.State({}) # Header with gr.Row(): gr.HTML("""

🎙️🤖 Azure-Powered AI Conference Service

Advanced AI-powered conference analysis with transcription, computer vision, and intelligent summarization using Azure AI Foundry

""") # User stats display user_stats_display = gr.Textbox( label="", lines=1, interactive=False, show_label=False, placeholder="👤 Please log in to view your comprehensive statistics...", elem_classes=["user-stats"] ) # Authentication Section with gr.Column(visible=True, elem_classes=["auth-form"]) as auth_section: gr.Markdown("## 🔐 Authentication Required") gr.Markdown("Please log in or create an account to use the advanced AI-powered conference analysis service.") with gr.Tabs() as auth_tabs: # Login Tab with gr.Tab("🔒 Login") as login_tab: with gr.Column(): login_email = gr.Textbox( label="Email or Username", placeholder="Enter your email or username" ) login_password = gr.Textbox( label="Password", type="password", placeholder="Enter your password" ) with gr.Row(): login_btn = gr.Button("🔒 Login", variant="primary", elem_classes=["auth-button"]) login_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Enter your credentials and click Login" ) # Register Tab with gr.Tab("📝 Register") as register_tab: with gr.Column(): reg_email = gr.Textbox( label="Email", placeholder="Enter your email address" ) reg_username = gr.Textbox( label="Username", placeholder="Choose a username (3-30 characters, alphanumeric and underscore)" ) reg_password = gr.Textbox( label="Password", type="password", placeholder="Create a strong password (min 8 chars, mixed case, numbers)" ) reg_confirm_password = gr.Textbox( label="Confirm Password", type="password", placeholder="Confirm your password" ) gr.Markdown("### 📋 Privacy & Data Consent") with gr.Column(elem_classes=["privacy-notice"]): gr.Markdown(""" **Enhanced Privacy Notice**: By creating an account, you acknowledge that: - Your data will be stored securely in user-separated Azure Blob Storage - Transcriptions are processed using Azure Speech Services - AI summaries are generated using Azure OpenAI with advanced privacy protection - Computer vision analysis may be performed on uploaded images/videos - You can export or delete all your data (transcripts + AI summaries) at any time - We comply with GDPR and data protection regulations """) gdpr_consent = gr.Checkbox( label="I consent to the processing of my personal data as described in the Privacy Notice (Required)", value=False ) data_retention_consent = gr.Checkbox( label="I agree to data retention for transcription and AI analysis service functionality (Required)", value=False ) marketing_consent = gr.Checkbox( label="I consent to receiving marketing communications about new AI features (Optional)", value=False ) with gr.Row(): register_btn = gr.Button("📝 Create Account", variant="primary", elem_classes=["auth-button"]) register_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Fill out the form and agree to the required consents to create your account" ) login_after_register = gr.Button( "🔒 Go to Login", visible=False, variant="secondary" ) # Main Application (visible only when logged in) with gr.Column(visible=False) as main_app: # Logout button with gr.Row(): with gr.Column(scale=3): pass with gr.Column(scale=1): logout_btn = gr.Button("👋 Logout", variant="secondary") # Enhanced tabs with AI Summary with gr.Tabs(): # Transcription tab with gr.Tab("🎙️ Transcribe"): with gr.Row(): # Left column - Input settings with gr.Column(scale=1): gr.Markdown("### 📁 Upload File") file_upload = gr.File( label="Audio or Video File", type="filepath", file_types=[ ".wav", ".mp3", ".ogg", ".opus", ".flac", ".wma", ".aac", ".m4a", ".amr", ".webm", ".speex", ".mp4", ".mov", ".avi", ".mkv", ".wmv", ".flv", ".3gp" ], elem_classes=["file-upload-area"] ) with gr.Row(): language = gr.Dropdown( choices=[(v, k) for k, v in ALLOWED_LANGS.items()], label="Language", value="th-TH" # Default to Thai ) audio_format = gr.Dropdown( choices=AUDIO_FORMATS, value="wav", label="Output Format" ) gr.Markdown("### ⚙️ Advanced Settings") with gr.Row(): diarization_enabled = gr.Checkbox( label="Speaker Identification", value=True ) speakers = gr.Slider( minimum=1, maximum=10, step=1, value=2, label="Max Speakers" ) with gr.Row(): timestamps = gr.Checkbox( label="Timestamps", value=True ) profanity = gr.Dropdown( choices=["masked", "removed", "raw"], value="masked", label="Profanity Filter" ) with gr.Row(): punctuation = gr.Dropdown( choices=["automatic", "dictated", "none"], value="automatic", label="Punctuation" ) lexical = gr.Checkbox( label="Lexical Form", value=False ) submit_btn = gr.Button( "🚀 Start Transcription", variant="primary", size="lg" ) # Right column - Results with gr.Column(scale=1): gr.Markdown("### 📊 Status & Results") # Auto-refresh indicator auto_refresh_status_display = gr.Textbox( label="", lines=1, interactive=False, show_label=False, visible=False, elem_classes=["auto-refresh-indicator"] ) status_display = gr.Textbox( label="", lines=4, interactive=False, show_label=False, placeholder="Upload a file and click 'Start Transcription' to begin...\nStatus will auto-refresh every 10 seconds during processing.\nYour data is stored in your private user folder for PDPA compliance.\nCompleted transcripts can be used for AI summarization.", elem_classes=["status-display"] ) job_info = gr.Textbox( label="", lines=1, interactive=False, show_label=False, placeholder="" ) with gr.Row(): refresh_btn = gr.Button( "🔄 Check Status", variant="secondary" ) stop_refresh_btn = gr.Button( "⏹️ Stop Auto-Refresh", variant="secondary" ) gr.Markdown("### 📄 Transcript Results") transcript_output = gr.Textbox( label="Transcript", lines=12, interactive=False, placeholder="Your transcript with speaker identification and precise timestamps (HH:MM:SS) will appear here...\nThis transcript will be available for AI-powered summarization.", elem_classes=["status-display"] ) download_file = gr.File( label="Download Transcript", interactive=False, visible=False ) # AI Summary tab with proper structure with gr.Tab("🤖 AI Summary Conference"): gr.Markdown("### 🎯 AI-Powered Conference Summarization") gr.Markdown("*Generate intelligent summaries from transcripts, documents, audio/video files, and visual content using Azure AI Foundry*") with gr.Row(): # INPUT CONTENT Block with gr.Column(scale=1, elem_classes=["ai-content-sources"]): gr.Markdown("## 📂 INPUT CONTENT") # Part 1: Audio/Video Content with Tabs gr.Markdown("#### Audio/Video Content") with gr.Tabs(): with gr.Tab("📜 Existing Transcripts"): available_transcripts = gr.Dropdown( label="Select Transcript", choices=[], value=None, multiselect=True ) refresh_transcripts_btn = gr.Button( "🔄 Refresh Transcripts", variant="secondary", size="sm" ) with gr.Tab("🎥 New Audio/Video Files"): new_audio_video_file = gr.File( label="Upload Audio/Video File", file_count="single", file_types=[ ".mp4", ".mov", ".avi", ".mkv", ".webm", ".flv", ".3gp", ".wmv", ".wav", ".mp3", ".ogg", ".opus", ".flac", ".wma", ".aac", ".m4a", ".amr", ".speex" ], elem_classes=["file-upload-area"] ) # Part 2: Document/Image Uploads for OCR gr.Markdown("#### Document/Image Files (OCR Processing)") document_image_files = gr.File( label="Upload Documents/Images for OCR Text Extraction", file_count="multiple", file_types=[ ".pdf", ".docx", ".doc", ".pptx", ".ppt", ".xlsx", ".xls", ".txt", ".csv", ".json", ".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff", ".webp" ], elem_classes=["file-upload-area"] ) gr.HTML("""

Documents: PDF, DOCX, DOC, PPTX, PPT, XLSX, XLS, TXT, CSV, JSON

Images: JPG, JPEG, PNG, BMP, GIF, TIFF, WebP

OCR Process: Extract text from documents and images for AI analysis

""") # AI Instructions with gr.Column(scale=1, elem_classes=["ai-instructions"]): gr.Markdown("## 🧠 AI Instructions") gr.Markdown("#### Instructions for AI") ai_instructions = gr.Textbox( label="", lines=6, placeholder="Describe the conference type, desired format, and any corrections...\n\nExample: 'สรุปการประชุมรายไตรมาสนี้ โฟกัสที่ผลการเงิน การตัดสินใจสำคัญ และแผนงาน สร้างบทสรุปผู้บริหารพร้อมจุดสำคัญ'", value="สรุปเนื้อหาการประชุมหรือเอกสารนี้ โดยจัดรูปแบบเป็นหัวข้อและรายละเอียดสำคัญ พร้อมระบุประเด็นที่ต้องติดตาม", # Default Thai instructions show_label=False ) with gr.Row(): summary_format = gr.Dropdown( choices=["บทสรุปผู้บริหาร", "รายงานการประชุม", "แผนงานและภารกิจ", "ประเด็นสำคัญ", "การวิเคราะห์เชิงลึก"], value="บทสรุปผู้บริหาร", label="Format" ) output_language = gr.Dropdown( choices=["Thai", "English", "Spanish", "French", "German", "Chinese", "Japanese"], value="Thai", # Default to Thai label="Language" ) gr.Markdown("#### Focus Areas") focus_areas = gr.Textbox( label="", placeholder="เช่น ผลการเงิน การตัดสินใจ การพูดคุยด้านเทคนิค", show_label=False ) with gr.Row(): include_timestamps = gr.Checkbox( label="เวลา (Timestamps)", value=True ) include_action_items = gr.Checkbox( label="รายการภารกิจ", value=True ) generate_summary_btn = gr.Button( "🚀 สร้าง AI Summary", variant="primary", size="lg", elem_classes=["ai-button"] ) # Status & Results with gr.Column(scale=1, elem_classes=["ai-results"]): gr.Markdown("## 📊 Status & Results") ai_status_display = gr.Textbox( label="", lines=3, interactive=False, show_label=False, placeholder="ไม่มีงาน AI Summary ที่กำลังดำเนินการ", elem_classes=["ai-status-display"] ) # Auto-refresh indicator for AI ai_auto_refresh_status = gr.Textbox( label="", lines=1, interactive=False, show_label=False, visible=False, elem_classes=["auto-refresh-indicator"] ) ai_job_info = gr.Textbox( label="", lines=1, interactive=False, show_label=False, placeholder="" ) check_ai_status_btn = gr.Button( "🔄 Check Status", variant="secondary" ) gr.Markdown("#### AI Summary Results") ai_summary_output = gr.Textbox( label="", lines=12, interactive=False, show_label=False, placeholder="ผลลัพธ์ AI Summary พร้อมข้อมูลเชิงลึก ประเด็นสำคัญ และคำแนะนำที่สามารถนำไปปฏิบัติได้จะแสดงที่นี่...", elem_classes=["ai-status-display"] ) ai_download_file = gr.File( label="Download AI Summary", interactive=False, visible=False ) # Enhanced History tab with separate services with gr.Tab("📚 My History"): gr.Markdown("### 📋 Your Service History") gr.Markdown("*View and download your transcription and AI summarization history (PDPA compliant - only your data)*") # Service History Tabs with gr.Tabs(): # Transcription History Tab with gr.Tab("🎙️ Transcription History"): with gr.Row(): refresh_transcription_history_btn = gr.Button( "🔄 Refresh Transcription History", variant="primary" ) show_all_transcriptions_checkbox = gr.Checkbox( label="Show All Transcription Records (not just recent 20)", value=False ) transcription_history_table = gr.Dataframe( headers=["Date", "Filename", "Language", "Status", "Duration", "Job ID", "Download"], datatype=["str", "str", "str", "str", "str", "str", "str"], col_count=(7, "fixed"), row_count=(20, "dynamic"), wrap=True, interactive=False, elem_classes=["history-table"] ) # Download Files Section for Transcriptions gr.Markdown("### 📥 Download Your Transcripts") gr.Markdown("*Your available transcript downloads will appear below after refreshing*") with gr.Column(): transcript_download_1 = gr.File(label="", visible=False, interactive=False) transcript_download_2 = gr.File(label="", visible=False, interactive=False) transcript_download_3 = gr.File(label="", visible=False, interactive=False) transcript_download_4 = gr.File(label="", visible=False, interactive=False) transcript_download_5 = gr.File(label="", visible=False, interactive=False) # AI Summary History Tab with gr.Tab("🤖 AI Summary History"): with gr.Row(): refresh_ai_summary_history_btn = gr.Button( "🔄 Refresh AI Summary History", variant="primary" ) show_all_summaries_checkbox = gr.Checkbox( label="Show All AI Summary Records (not just recent 20)", value=False ) ai_summary_history_table = gr.Dataframe( headers=["Date", "Sources", "Language", "Status", "Duration", "Job ID", "Download"], datatype=["str", "str", "str", "str", "str", "str", "str"], col_count=(7, "fixed"), row_count=(20, "dynamic"), wrap=True, interactive=False, elem_classes=["history-table"] ) # Download Files Section for AI Summaries gr.Markdown("### 📥 Download Your AI Summaries") gr.Markdown("*Your available AI summary downloads will appear below after refreshing*") with gr.Column(): summary_download_1 = gr.File(label="", visible=False, interactive=False) summary_download_2 = gr.File(label="", visible=False, interactive=False) summary_download_3 = gr.File(label="", visible=False, interactive=False) summary_download_4 = gr.File(label="", visible=False, interactive=False) summary_download_5 = gr.File(label="", visible=False, interactive=False) # Enhanced Privacy & Data tab with gr.Tab("🔒 Privacy & Data"): gr.Markdown("### 🔒 Enhanced GDPR & Data Protection") gr.Markdown("Manage your personal data and privacy settings for both transcription and AI services in compliance with data protection regulations.") with gr.Column(elem_classes=["pdpa-section"]): gr.Markdown("#### 📊 Complete Data Export") gr.Markdown("Download all your personal data including transcriptions, AI summaries, account info, and usage statistics.") export_btn = gr.Button("📦 Export My Complete Data", variant="primary") export_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Click 'Export My Complete Data' to download your comprehensive data archive (transcripts + AI summaries)" ) export_file = gr.File( label="Your Complete Data Export", visible=False, interactive=False ) with gr.Column(elem_classes=["pdpa-section"]): gr.Markdown("#### 📧 Marketing Consent") gr.Markdown("Update your preferences for receiving marketing communications about new AI features.") marketing_consent_checkbox = gr.Checkbox( label="I consent to receiving marketing communications about new AI features", value=False ) update_consent_btn = gr.Button("✅ Update Consent", variant="secondary") consent_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Update your marketing consent preferences" ) with gr.Column(elem_classes=["pdpa-section"]): gr.Markdown("#### ⚠️ Complete Account Deletion") gr.Markdown(""" **Warning**: This action is irreversible and will permanently delete: - Your user account and profile - All transcription history and files - All AI summary history and results - All data stored in Azure Blob Storage - Usage statistics and preferences - Any stored AI model interactions """) deletion_confirmation = gr.Textbox( label="Type 'DELETE MY ACCOUNT' to confirm", placeholder="Type the exact phrase to confirm complete account deletion" ) delete_account_btn = gr.Button( "🗑️ Delete My Complete Account", variant="stop", elem_classes=["danger-button"] ) deletion_status = gr.Textbox( label="", show_label=False, interactive=False, placeholder="Complete account deletion requires confirmation text" ) # Auto-refresh timers transcript_timer = gr.Timer(10.0) ai_timer = gr.Timer(10.0) # Event handlers # Authentication events (same as before) login_btn.click( login_user, inputs=[login_email, login_password], outputs=[login_status, current_user, auth_section, main_app, user_stats_display] ).then( on_user_login, inputs=[current_user], outputs=[marketing_consent_checkbox] ).then( lambda user: ("", "") if user else (gr.update(), gr.update()), inputs=[current_user], outputs=[login_email, login_password] ).then( get_available_transcripts, inputs=[current_user], outputs=[available_transcripts] ) register_btn.click( register_user, inputs=[reg_email, reg_username, reg_password, reg_confirm_password, gdpr_consent, data_retention_consent, marketing_consent], outputs=[register_status, login_after_register] ).then( lambda status: ("", "", "", "", False, False, False) if "✅" in status else (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()), inputs=[register_status], outputs=[reg_email, reg_username, reg_password, reg_confirm_password, gdpr_consent, data_retention_consent, marketing_consent] ) login_after_register.click( lambda: (gr.update(selected=0), ""), outputs=[auth_tabs, register_status] ) logout_btn.click( logout_user, outputs=[current_user, login_status, auth_section, main_app, user_stats_display] ) # Transcription events submit_btn.click( submit_transcription, inputs=[ file_upload, language, audio_format, diarization_enabled, speakers, profanity, punctuation, timestamps, lexical, current_user ], outputs=[status_display, transcript_output, download_file, job_info, job_state, auto_refresh_status_display, user_stats_display] ) refresh_btn.click( check_current_job_status, inputs=[job_state, current_user], outputs=[status_display, transcript_output, download_file, job_info, auto_refresh_status_display, user_stats_display] ) # AI Summary events - simplified structure refresh_transcripts_btn.click( lambda user: gr.update(choices=[(job.original_filename + f" ({job.created_at[:16]})", job.job_id) for job in transcription_manager.get_user_history(user.user_id, limit=50) if job.status == 'completed' and job.transcript_text] if user else []), inputs=[current_user], outputs=[available_transcripts] ) generate_summary_btn.click( submit_ai_summary_enhanced, inputs=[ available_transcripts, new_audio_video_file, document_image_files, ai_instructions, summary_format, output_language, focus_areas, include_timestamps, include_action_items, current_user ], outputs=[ai_status_display, ai_summary_output, ai_download_file, ai_job_info, summary_job_state, ai_auto_refresh_status, user_stats_display] ) check_ai_status_btn.click( check_ai_summary_status, inputs=[summary_job_state, current_user], outputs=[ai_status_display, ai_summary_output, ai_download_file, ai_job_info, ai_auto_refresh_status, user_stats_display] ) # Auto-refresh timer events transcript_timer.tick( auto_refresh_status, inputs=[job_state, current_user], outputs=[status_display, transcript_output, download_file, job_info, auto_refresh_status_display, user_stats_display] ) ai_timer.tick( auto_refresh_ai_summary, inputs=[summary_job_state, current_user], outputs=[ai_status_display, ai_summary_output, ai_download_file, ai_job_info, ai_auto_refresh_status, user_stats_display] ) # History events - Separate for each service with downloads refresh_transcription_history_btn.click( refresh_transcription_history, inputs=[current_user, show_all_transcriptions_checkbox], outputs=[transcription_history_table, user_stats_display, transcript_download_1, transcript_download_2, transcript_download_3, transcript_download_4, transcript_download_5] ) show_all_transcriptions_checkbox.change( refresh_transcription_history, inputs=[current_user, show_all_transcriptions_checkbox], outputs=[transcription_history_table, user_stats_display, transcript_download_1, transcript_download_2, transcript_download_3, transcript_download_4, transcript_download_5] ) refresh_ai_summary_history_btn.click( refresh_ai_summary_history, inputs=[current_user, show_all_summaries_checkbox], outputs=[ai_summary_history_table, user_stats_display, summary_download_1, summary_download_2, summary_download_3, summary_download_4, summary_download_5] ) show_all_summaries_checkbox.change( refresh_ai_summary_history, inputs=[current_user, show_all_summaries_checkbox], outputs=[ai_summary_history_table, user_stats_display, summary_download_1, summary_download_2, summary_download_3, summary_download_4, summary_download_5] ) # PDPA compliance events export_btn.click( export_user_data, inputs=[current_user], outputs=[export_status, export_file] ) update_consent_btn.click( update_marketing_consent, inputs=[current_user, marketing_consent_checkbox], outputs=[consent_status] ) delete_account_btn.click( delete_user_account, inputs=[current_user, deletion_confirmation], outputs=[deletion_status, current_user, auth_section, main_app] ) # Auto-hide/show speakers slider diarization_enabled.change( lambda enabled: gr.update(visible=enabled), inputs=[diarization_enabled], outputs=[speakers] ) # Load initial data demo.load( lambda: ( print("🚀 Azure-Powered AI Conference Service Started..."), get_user_stats_display(None) )[1], outputs=[user_stats_display] ) # Enhanced info section with demo: gr.HTML("""

📋 How to Use the Advanced AI Service

  1. 🔐 Register/Login: Create an account or log in with existing credentials
  2. 🎙️ Transcribe: Upload audio/video files for high-quality transcription with speaker identification
  3. 🤖 AI Analysis: Choose from 3 content types: existing transcripts, new audio/video, or OCR images
  4. 📊 Monitor: Status auto-updates every 10 seconds with real-time progress
  5. 📥 Download: Get transcripts and AI summaries with comprehensive insights
  6. 🔒 Manage: Use Privacy & Data tab to export or delete all your data

🆕 Enhanced AI Summary Features

📜 Existing Transcripts: Select from your completed transcriptions

🎥 New Audio/Video: Upload files for transcription + AI analysis

🖼️ OCR Images: Extract text from images using computer vision

🇹🇭 Thai Default: Optimized for Thai language processing

📚 Separate History: Independent tracking for each service

🔄 Auto-Refresh: Real-time status updates for all processes

🎵 Enhanced File Support

🔹 Video: MP4, MOV, AVI, MKV, WMV, FLV, 3GP, WebM

🎵 Audio: WAV, MP3, OGG, OPUS, FLAC, WMA, AAC, M4A

📄 Documents: PDF, DOCX, DOC, PPTX, PPT, XLSX, XLS, CSV, TXT

🖼️ Images: JPG, JPEG, PNG, BMP, GIF, TIFF, WebP (with OCR)

💡 Pro Tips for Best Results

🚀 Powered by Azure AI Foundry | 🔒 Enterprise-Grade Security | 🌐 GDPR Compliant | 🇹🇭 Thai Language Optimized

""") if __name__ == "__main__": print("🚀 Starting Advanced Azure-Powered AI Conference Service...") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )