# app.py - Refactored to eliminate recorder_server.py dependency import streamlit as st import os import tempfile import json from pathlib import Path import time import traceback import streamlit.components.v1 as components import hashlib from datetime import datetime # from st_audiorec import st_audiorec # Import the new recorder component - OLD # Reduce metrics/usage writes that can cause permission errors on hosted environments try: st.set_option('browser.gatherUsageStats', False) except Exception: pass # Robust component declaration: prefer local build, else fall back to pip package parent_dir = os.path.dirname(os.path.abspath(__file__)) build_dir = os.path.join(parent_dir, "custom_components/st-audiorec/st_audiorec/frontend/build") def st_audiorec(key=None): """Return audio recorder component value, trying local build first, then pip package fallback.""" try: if os.path.isdir(build_dir): _component_func = components.declare_component("st_audiorec", path=build_dir) return _component_func(key=key, default=0) # Fallback to pip-installed component if available try: from st_audiorec import st_audiorec as st_audiorec_pkg return st_audiorec_pkg(key=key) except Exception: st.warning("Audio recorder component is unavailable on this deployment (missing local build and pip fallback).") return None except Exception: # Final safety net st.warning("Failed to initialize audio recorder component.") return None # --- Critical Imports and Initial Checks --- AUDIO_PROCESSOR_CLASS = None IMPORT_ERROR_TRACEBACK = None try: from audio_processor import AudioProcessor AUDIO_PROCESSOR_CLASS = AudioProcessor except Exception: IMPORT_ERROR_TRACEBACK = traceback.format_exc() from video_generator import VideoGenerator from mp3_embedder import MP3Embedder from utils import format_timestamp from translator import get_translator, UI_TRANSLATIONS from exporter import BroadcastExporter, ExportConfig from google_docs_config import google_docs_manager from ai_questions import get_ai_question_engine, TextSelection from style_fixes import apply_custom_styling, create_broadcast_bubble, create_white_container, create_processing_result_container import requests from dotenv import load_dotenv # --- API Key Check --- def check_api_key(): """Check for Gemini API key and display instructions if not found.""" load_dotenv() if not os.getenv("GEMINI_API_KEY"): st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!") st.info("To fix this, please follow these steps:") st.markdown(""" 1. **Find the file named `.env.example`** in the `syncmaster2` directory. 2. **Rename it to `.env`**. 3. **Open the `.env` file** with a text editor. 4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey). 5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`. 6. **Save the file and restart the application.** """) return False return True # --- Summary Helper (robust to cached translator without summarize_text) --- def generate_summary(text: str, target_language: str = 'ar'): """Generate a concise summary in target_language, with graceful fallback. If summarize_text is unavailable (cached instance), fall back to Arabic summary then translate to the target language if needed. """ tr = get_translator() try: if hasattr(tr, 'summarize_text') and callable(getattr(tr, 'summarize_text')): s, err = tr.summarize_text(text or '', target_language=target_language) if s: return s, None # Fallback path: Arabic summary first s_ar, err_ar = tr.summarize_text_arabic(text or '') if target_language and target_language != 'ar' and s_ar: tx, err_tx = tr.translate_text(s_ar, target_language=target_language) if tx: return tx, None return s_ar, err_tx return s_ar, err_ar except Exception as e: return None, str(e) # --- Custom Feedback Functions (No Dimming Effect) --- def show_processing_feedback(message: str, language: str = 'en'): """عرض تغذية راجعة للمعالجة بدون تعتيم الصفحة""" if language == 'ar': feedback_message = f"🔄 {message}" else: feedback_message = f"🔄 {message}" # استخدام st.info بدلاً من st.spinner لتجنب تأثير التعتيم info_placeholder = st.info(feedback_message) return info_placeholder def show_status_update(message: str, status_type: str = 'info', language: str = 'en'): """عرض تحديث الحالة بدون تعتيم الصفحة""" # إضافة الرموز التعبيرية المناسبة if status_type == 'success': icon = "✅" st.success(f"{icon} {message}") elif status_type == 'error': icon = "❌" st.error(f"{icon} {message}") elif status_type == 'warning': icon = "⚠️" st.warning(f"{icon} {message}") else: icon = "ℹ️" st.info(f"{icon} {message}") def cleanup_processing_state(): """تنظيف حالة المعالجة عند حدوث خطأ""" if 'processing_state' in st.session_state: st.session_state.processing_state = { 'is_processing': False, 'current_task': '', 'progress': 0.0, 'message': '', 'language': st.session_state.get('language', 'en') } # تنظيف حالات المعالجة الأخرى if 'processing_status' in st.session_state: for task_id in list(st.session_state.processing_status.keys()): if st.session_state.processing_status[task_id] in ['processing', 'queued']: st.session_state.processing_status[task_id] = 'failed' def show_processing_progress(message: str, progress: float = None, language: str = 'en'): """عرض تقدم المعالجة مع شريط التقدم بدون تعتيم""" if language == 'ar': feedback_message = f"🔄 {message}" else: feedback_message = f"🔄 {message}" st.info(feedback_message) if progress is not None: progress_bar = st.progress(progress) return progress_bar else: # شريط تقدم غير محدد progress_bar = st.progress(0) return progress_bar # --- Page Configuration --- st.set_page_config( page_title="SyncMaster - AI Audio-Text Synchronization", page_icon="🎵", layout="wide" ) # --- Browser Console Logging Utility --- def log_to_browser_console(messages): """Injects JavaScript to log messages to the browser's console.""" if isinstance(messages, str): messages = [messages] escaped_messages = [json.dumps(str(msg)) for msg in messages] js_code = f""" """ components.html(js_code, height=0, scrolling=False) # --- AI Models Reset Function --- def reset_ai_models(): """Reset all AI models and clear cache completely""" # Clear ALL session state keys that might contain cached AI instances keys_to_clear = [] for key in list(st.session_state.keys()): if any(term in key.lower() for term in ['translator', 'ai', 'question', 'model', 'engine', 'processing']): keys_to_clear.append(key) for key in keys_to_clear: del st.session_state[key] # Force reload environment variables load_dotenv(override=True) # Clear Python module cache completely import sys import importlib modules_to_reload = ['translator', 'ai_questions', 'exporter'] for module_name in modules_to_reload: if module_name in sys.modules: try: # Delete from sys.modules first del sys.modules[module_name] except: pass # Clear any global instances try: import translator if hasattr(translator, 'translator_instance'): translator.translator_instance = None except: pass try: import ai_questions if hasattr(ai_questions, 'ai_question_engine'): ai_questions.ai_question_engine = None except: pass # --- Session State Initialization --- def initialize_session_state(): """Initializes the session state variables if they don't exist.""" if 'step' not in st.session_state: st.session_state.step = 1 if 'audio_data' not in st.session_state: st.session_state.audio_data = None if 'language' not in st.session_state: st.session_state.language = 'en' if 'enable_translation' not in st.session_state: st.session_state.enable_translation = True if 'target_language' not in st.session_state: st.session_state.target_language = 'ar' if 'transcription_data' not in st.session_state: st.session_state.transcription_data = None if 'edited_text' not in st.session_state: st.session_state.edited_text = "" if 'video_style' not in st.session_state: st.session_state.video_style = { 'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF', 'highlight_color': '#FFD700', 'background_color': '#000000', 'font_family': 'Arial', 'font_size': 48 } if 'new_recording' not in st.session_state: st.session_state.new_recording = None # Transcript feed (prepend latest) and dedupe set if 'transcript_feed' not in st.session_state: st.session_state.transcript_feed = [] # list of {id, ts, text} if 'transcript_ids' not in st.session_state: st.session_state.transcript_ids = set() # Incremental broadcast state if 'broadcast_segments' not in st.session_state: st.session_state.broadcast_segments = [] # [{id, recording_id, start_ms, end_ms, checksum, text}] if 'lastFetchedEnd_ms' not in st.session_state: st.session_state.lastFetchedEnd_ms = 0 # Broadcast translation language (separate from general UI translation target) if 'broadcast_translation_lang' not in st.session_state: # Default broadcast translation target to Arabic st.session_state.broadcast_translation_lang = 'ar' if 'summary_language' not in st.session_state: # Default summary language to Arabic st.session_state.summary_language = 'ar' # Auto-generate Arabic summary toggle if 'auto_generate_summary' not in st.session_state: st.session_state.auto_generate_summary = True # Export functionality state if 'export_timestamp' not in st.session_state: st.session_state.export_timestamp = None if 'show_export_modal' not in st.session_state: st.session_state.show_export_modal = False if 'export_format' not in st.session_state: st.session_state.export_format = 'word' # AI Questions functionality state if 'selected_text' not in st.session_state: st.session_state.selected_text = None if 'selected_segment_id' not in st.session_state: st.session_state.selected_segment_id = None if 'show_question_modal' not in st.session_state: st.session_state.show_question_modal = False if 'current_question_session' not in st.session_state: st.session_state.current_question_session = None if 'preferred_ai_model' not in st.session_state: st.session_state.preferred_ai_model = 'auto' if 'preferred_answer_language' not in st.session_state: st.session_state.preferred_answer_language = 'auto' # Background processing state if 'processing_queue' not in st.session_state: st.session_state.processing_queue = [] if 'processing_results' not in st.session_state: st.session_state.processing_results = {} if 'processing_status' not in st.session_state: st.session_state.processing_status = {} # --- Background Audio Processing Function --- def queue_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): """Queue audio for background processing""" import uuid # Generate unique ID for this processing task task_id = str(uuid.uuid4())[:8] # Add to processing queue task = { 'id': task_id, 'audio_bytes': audio_bytes, 'filename': original_filename, 'timestamp': time.time(), 'status': 'queued' } st.session_state.processing_queue.append(task) st.session_state.processing_status[task_id] = 'queued' # Show immediate feedback st.info(f"🔄 {'تم إضافة التسجيل للمعالجة' if st.session_state.language == 'ar' else 'Audio queued for processing'} (ID: {task_id})") return task_id def process_queued_audio(): """Process queued audio in background""" if not st.session_state.processing_queue: return # Process first item in queue task = st.session_state.processing_queue[0] task_id = task['id'] # Update status st.session_state.processing_status[task_id] = 'processing' task['status'] = 'processing' # Show processing status using custom feedback instead of st.status processing_message = f"{'معالجة التسجيل' if st.session_state.language == 'ar' else 'Processing audio'} {task_id}..." feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) try: # Process the audio result = run_audio_processing_sync(task['audio_bytes'], task['filename']) # إزالة رسالة المعالجة feedback_placeholder.empty() if result: # Store result st.session_state.processing_results[task_id] = result st.session_state.processing_status[task_id] = 'completed' task['status'] = 'completed' show_status_update(f"{'تم الانتهاء من المعالجة' if st.session_state.language == 'ar' else 'Processing completed'} {task_id}", 'success', st.session_state.language) else: st.session_state.processing_status[task_id] = 'failed' task['status'] = 'failed' show_status_update(f"{'فشلت المعالجة' if st.session_state.language == 'ar' else 'Processing failed'} {task_id}", 'error', st.session_state.language) except Exception as e: # إزالة رسالة المعالجة في حالة الخطأ feedback_placeholder.empty() st.session_state.processing_status[task_id] = 'failed' task['status'] = 'failed' show_status_update(f"{'خطأ في المعالجة' if st.session_state.language == 'ar' else 'Processing error'} {task_id}: {str(e)}", 'error', st.session_state.language) cleanup_processing_state() # Remove from queue st.session_state.processing_queue.pop(0) # --- Centralized Audio Processing Function --- def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): """Main audio processing function with background support""" # Check if background processing is enabled if st.session_state.get('background_processing', True): return queue_audio_processing(audio_bytes, original_filename) else: return run_audio_processing_sync(audio_bytes, original_filename) def run_audio_processing_sync(audio_bytes, original_filename="recorded_audio.wav"): """ A single, robust function to handle all audio processing. Takes audio bytes as input and returns the processed data. """ # This function is the classic, non-Custom path; ensure editor sections are enabled st.session_state['_custom_active'] = False if not audio_bytes: st.error("No audio data provided to process.") return tmp_file_path = None log_to_browser_console("--- INFO: Starting unified audio processing. ---") try: with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file: tmp_file.write(audio_bytes) tmp_file_path = tmp_file.name processor = AUDIO_PROCESSOR_CLASS() result_data = None full_text = "" word_timestamps = [] # Determine which processing path to take if st.session_state.enable_translation: # استخدام التغذية الراجعة المخصصة بدلاً من st.spinner processing_message = "Performing AI Transcription & Translation... please wait." if st.session_state.language == 'en' else "جاري تنفيذ النسخ والترجمة بالذكاء الاصطناعي... يرجى الانتظار." feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) try: result_data, processor_logs = processor.get_word_timestamps_with_translation( tmp_file_path, st.session_state.target_language, ) # إزالة رسالة المعالجة feedback_placeholder.empty() except Exception as e: feedback_placeholder.empty() cleanup_processing_state() raise e log_to_browser_console(processor_logs) if not result_data or not result_data.get("original_text"): st.warning( "Could not generate transcription with translation. Check browser console (F12) for logs." ) return st.session_state.transcription_data = { "text": result_data["original_text"], "translated_text": result_data["translated_text"], "word_timestamps": result_data["word_timestamps"], "audio_bytes": audio_bytes, "original_suffix": Path(original_filename).suffix, "translation_success": result_data.get("translation_success", False), "detected_language": result_data.get("language_detected", "unknown"), } # Update transcript feed (prepend, dedupe by digest) try: digest = hashlib.md5(audio_bytes).hexdigest() except Exception: digest = f"snap-{int(time.time()*1000)}" if digest not in st.session_state.transcript_ids: st.session_state.transcript_ids.add(digest) st.session_state.transcript_feed.insert( 0, { "id": digest, "ts": int(time.time() * 1000), "text": result_data["original_text"], }, ) # Rebuild edited_text with newest first st.session_state.edited_text = "\n\n".join( [s["text"] for s in st.session_state.transcript_feed] ) else: # Standard processing without translation # استخدام التغذية الراجعة المخصصة بدلاً من st.spinner processing_message = "Performing AI Transcription... please wait." if st.session_state.language == 'en' else "جاري تنفيذ النسخ بالذكاء الاصطناعي... يرجى الانتظار." feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) try: word_timestamps, processor_logs = processor.get_word_timestamps( tmp_file_path ) # إزالة رسالة المعالجة feedback_placeholder.empty() except Exception as e: feedback_placeholder.empty() cleanup_processing_state() raise e log_to_browser_console(processor_logs) if not word_timestamps: st.warning( "Could not generate timestamps. Check browser console (F12) for logs." ) return full_text = " ".join([d["word"] for d in word_timestamps]) st.session_state.transcription_data = { "text": full_text, "word_timestamps": word_timestamps, "audio_bytes": audio_bytes, "original_suffix": Path(original_filename).suffix, "translation_success": False, } # Update transcript feed (prepend, dedupe by digest) try: digest = hashlib.md5(audio_bytes).hexdigest() except Exception: digest = f"snap-{int(time.time()*1000)}" if digest not in st.session_state.transcript_ids: st.session_state.transcript_ids.add(digest) st.session_state.transcript_feed.insert( 0, {"id": digest, "ts": int(time.time() * 1000), "text": full_text} ) # Rebuild edited_text with newest first st.session_state.edited_text = "\n\n".join( [s["text"] for s in st.session_state.transcript_feed] ) st.session_state.step = 1 # Keep it on the same step # Return result for background processing return { 'original_text': result_data.get("original_text") if result_data else full_text, 'translated_text': result_data.get("translated_text") if result_data else None, 'detected_language': result_data.get("language_detected") if result_data else "unknown", 'translation_success': result_data.get("translation_success", False) if result_data else False, 'word_timestamps': result_data.get("word_timestamps") if result_data else word_timestamps } except Exception as e: st.error("An unexpected error occurred during audio processing!") st.exception(e) log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---") return None finally: if tmp_file_path and os.path.exists(tmp_file_path): os.unlink(tmp_file_path) # --- Main Application Logic --- def main(): # Apply custom styling first apply_custom_styling() # Force reload environment variables load_dotenv(override=True) # Clear AI models cache on first run or if there's an issue if 'app_initialized' not in st.session_state: reset_ai_models() st.session_state.app_initialized = True initialize_session_state() st.markdown(""" """, unsafe_allow_html=True) with st.sidebar: st.markdown("## 🌐 Language Settings") language_options = {'English': 'en', 'العربية': 'ar'} selected_lang_display = st.selectbox( "Interface Language", options=list(language_options.keys()), index=0 if st.session_state.language == 'en' else 1 ) st.session_state.language = language_options[selected_lang_display] st.markdown("## 🔤 Translation Settings") st.session_state.enable_translation = st.checkbox( "Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي", value=st.session_state.enable_translation, help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً" ) if st.session_state.enable_translation: target_lang_options = { 'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es' } selected_target = st.selectbox( "Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة", options=list(target_lang_options.keys()), index=0 ) st.session_state.target_language = target_lang_options[selected_target] # Auto summary toggle st.session_state.auto_generate_summary = st.checkbox( "Auto-generate Arabic summary" if st.session_state.language == 'en' else "توليد الملخص العربي تلقائياً", value=st.session_state.auto_generate_summary ) # Google Account Status st.markdown("## 🔐 Google Account") if google_docs_manager.is_authenticated(): st.success("✅ متصل" if st.session_state.language == 'ar' else "✅ Connected") else: st.info("🔒 غير متصل" if st.session_state.language == 'ar' else "🔒 Not connected") # AI Questions Status st.markdown("## 🤖 AI Questions") # Show preferred model if st.session_state.preferred_ai_model != 'auto': st.info(f"🎯 {'النموذج المفضل' if st.session_state.language == 'ar' else 'Preferred Model'}: {st.session_state.preferred_ai_model}") else: st.info("🔄 " + ("تلقائي" if st.session_state.language == 'ar' else "Auto selection")) # Model reset button if st.button("🔄 " + ("إعادة تعيين النماذج" if st.session_state.language == 'ar' else "Reset AI Models"), help="إعادة تحميل نماذج الذكاء الاصطناعي" if st.session_state.language == 'ar' else "Reload AI models"): reset_ai_models() st.success("✅ " + ("تم إعادة تعيين النماذج" if st.session_state.language == 'ar' else "AI models reset successfully")) st.rerun() # Show preferred answer language answer_lang = st.session_state.get('preferred_answer_language', 'auto') if answer_lang != 'auto': lang_names = {'ar': '🇸🇦 العربية', 'en': '🇺🇸 English', 'fr': '🇫🇷 Français', 'es': '🇪🇸 Español', 'de': '🇩🇪 Deutsch', 'zh': '🇨🇳 中文'} lang_display = lang_names.get(answer_lang, answer_lang) st.info(f"🌐 {'لغة الإجابة' if st.session_state.language == 'ar' else 'Answer Language'}: {lang_display}") else: current_ui_lang = "🇸🇦 العربية" if st.session_state.language == 'ar' else "🇺🇸 English" st.info(f"🌐 {'لغة الإجابة' if st.session_state.language == 'ar' else 'Answer Language'}: {current_ui_lang} ({'تلقائي' if st.session_state.language == 'ar' else 'Auto'})") # Test AI services availability translator = get_translator() services_status = {} # Test Gemini try: if hasattr(translator, 'model') and translator.model: test_response = translator.model.generate_content("Test") services_status['Gemini'] = "✅" else: services_status['Gemini'] = "❌" except Exception as e: error_str = str(e) if "429" in error_str or "quota" in error_str.lower(): services_status['Gemini'] = "⚠️" else: services_status['Gemini'] = "❌" # Test Groq try: if hasattr(translator, '_groq_complete') and translator.groq_api_key: services_status['Groq'] = "✅" else: services_status['Groq'] = "❌" except Exception: services_status['Groq'] = "❌" # Test OpenRouter try: if hasattr(translator, '_openrouter_complete') and translator.openrouter_api_key: services_status['OpenRouter'] = "✅" else: services_status['OpenRouter'] = "❌" except Exception: services_status['OpenRouter'] = "❌" # Display services status st.markdown("**" + ("حالة النماذج" if st.session_state.language == 'ar' else "Models Status") + ":**") for service, status in services_status.items(): if status == "✅": st.success(f"{status} {service}") elif status == "⚠️": st.warning(f"{status} {service} (حد يومي)" if st.session_state.language == 'ar' else f"{status} {service} (quota)") else: st.error(f"{status} {service}") # Overall status available_count = sum(1 for status in services_status.values() if status == "✅") if available_count > 0: st.info(f"🤖 {available_count}/3 " + ("نماذج متاحة" if st.session_state.language == 'ar' else "models available")) else: st.warning("⚠️ جميع النماذج غير متاحة" if st.session_state.language == 'ar' else "⚠️ All models unavailable") # Show conversation status and usage stats question_engine = get_ai_question_engine() usage_stats = question_engine.get_model_usage_stats() total_questions = sum(usage_stats.values()) if st.session_state.current_question_session: session = question_engine.get_conversation_history(st.session_state.current_question_session) if session and session.conversation: st.info(f"💬 {len(session.conversation)} " + ("أسئلة نشطة" if st.session_state.language == 'ar' else "active questions")) else: st.info("🤖 جاهز للأسئلة" if st.session_state.language == 'ar' else "🤖 Ready for questions") else: st.info("🤖 جاهز للأسئلة" if st.session_state.language == 'ar' else "🤖 Ready for questions") # Show usage statistics if total_questions > 0: with st.expander("📊 إحصائيات الاستخدام" if st.session_state.language == 'ar' else "📊 Usage Statistics"): st.write(f"**{'إجمالي الأسئلة' if st.session_state.language == 'ar' else 'Total Questions'}: {total_questions}**") for model, count in usage_stats.items(): if count > 0: percentage = (count / total_questions) * 100 st.write(f"• {model}: {count} ({percentage:.1f}%)") else: st.caption("📊 لا توجد إحصائيات بعد" if st.session_state.language == 'ar' else "📊 No statistics yet") # Quick model switcher st.markdown("**" + ("تبديل سريع للنموذج" if st.session_state.language == 'ar' else "Quick Model Switch") + "**") question_engine = get_ai_question_engine() models_status = question_engine.check_model_availability() # Create buttons for each available model cols = st.columns(2) model_names = ['auto', 'Gemini AI', 'Groq AI', 'OpenRouter AI'] for i, model in enumerate(model_names): with cols[i % 2]: if model == 'auto': button_text = "🔄 تلقائي" if st.session_state.language == 'ar' else "🔄 Auto" is_current = st.session_state.preferred_ai_model == 'auto' else: status_info = models_status.get(model, {}) icon = status_info.get('icon', '❓') button_text = f"{icon} {model.split()[0]}" # Show first word + icon is_current = st.session_state.preferred_ai_model == model button_type = "primary" if is_current else "secondary" if st.button(button_text, key=f"switch_{model}", type=button_type, use_container_width=True): st.session_state.preferred_ai_model = model st.rerun() # Quick language switcher st.markdown("**" + ("تبديل سريع للغة" if st.session_state.language == 'ar' else "Quick Language Switch") + "**") language_buttons = { 'auto': "🔄 تلقائي" if st.session_state.language == 'ar' else "🔄 Auto", 'ar': "🇸🇦 عربي", 'en': "🇺🇸 EN", 'fr': "🇫🇷 FR", 'es': "🇪🇸 ES" } cols_lang = st.columns(3) for i, (lang_code, button_text) in enumerate(language_buttons.items()): with cols_lang[i % 3]: is_current_lang = st.session_state.get('preferred_answer_language', 'auto') == lang_code button_type_lang = "primary" if is_current_lang else "secondary" if st.button(button_text, key=f"switch_lang_{lang_code}", type=button_type_lang, use_container_width=True): st.session_state.preferred_answer_language = lang_code st.rerun() # Processing status if st.session_state.processing_queue or st.session_state.processing_results: st.markdown("## 🔄 " + ("حالة المعالجة" if st.session_state.language == 'ar' else "Processing Status")) # Queue status if st.session_state.processing_queue: queue_count = len(st.session_state.processing_queue) st.warning(f"⏳ {queue_count} " + ("في الانتظار" if st.session_state.language == 'ar' else "in queue")) # Results count if st.session_state.processing_results: results_count = len(st.session_state.processing_results) st.success(f"✅ {results_count} " + ("مكتمل" if st.session_state.language == 'ar' else "completed")) # Clear all button if st.button("🗑️ " + ("مسح الكل" if st.session_state.language == 'ar' else "Clear All")): st.session_state.processing_queue = [] st.session_state.processing_results = {} st.session_state.processing_status = {} st.rerun() st.title("🎵 SyncMaster") if st.session_state.language == 'ar': st.markdown("### منصة المزامنة الذكية بين الصوت والنص") else: st.markdown("### The Intelligent Audio-Text Synchronization Platform") # Simplified interface - removed step indicators as requested # Global settings for long recording retention and custom snapshot duration with st.expander("⚙️ Recording Settings (Snapshots)", expanded=False): st.session_state.setdefault('retention_minutes', 30) # 0 means: use full buffer by default for Custom st.session_state.setdefault('custom_snapshot_seconds', 0) # Auto-Custom interval seconds (for frontend auto trigger) st.session_state.setdefault('auto_custom_interval_sec', 10) # Auto-start incremental snapshots when recording begins st.session_state.setdefault('auto_start_custom', False) st.session_state.retention_minutes = st.number_input("Retention window (minutes)", min_value=5, max_value=240, value=st.session_state.retention_minutes) st.session_state.custom_snapshot_seconds = st.number_input("Custom snapshot (seconds; 0 = full buffer)", min_value=0, max_value=3600, value=st.session_state.custom_snapshot_seconds) st.session_state.auto_custom_interval_sec = st.number_input("Auto Custom interval (seconds)", min_value=1, max_value=3600, value=st.session_state.auto_custom_interval_sec, help="How often to auto-trigger the same Custom action while recording.") st.session_state.auto_start_custom = st.checkbox("Auto-start incremental snapshots on record", value=st.session_state.auto_start_custom, help="Start sending Custom intervals automatically as soon as you start recording.") # Inject globals into the page for the component to pick up components.html(f""" """, height=0) if AUDIO_PROCESSOR_CLASS is None: st.error("Fatal Error: The application could not start correctly.") st.subheader("An error occurred while trying to import `AudioProcessor`:") st.code(IMPORT_ERROR_TRACEBACK, language="python") st.stop() step_1_upload_and_process() # Process background queue if st.session_state.get('background_processing', True) and st.session_state.processing_queue: process_queued_audio() # Show processing results optionally if st.session_state.get('show_processing_results', False): show_processing_results() elif st.session_state.processing_results: # Show a button to view results if there are any if st.button("📝 " + ("عرض نتائج المعالجة" if st.session_state.language == 'ar' else "Show Processing Results") + f" ({len(st.session_state.processing_results)})", type="secondary"): st.session_state.show_processing_results = True st.rerun() # Note: step_2_review_and_customize removed as requested # Results are now shown in show_processing_results() function # AI Question modal (show outside of other components) if st.session_state.show_question_modal: show_question_modal() # Export modal (show outside of other components) if st.session_state.show_export_modal: show_export_modal() # --- Show Processing Results --- def show_processing_results(): """Show processing results in the same page""" if not st.session_state.processing_results: return st.markdown("---") # Header with results count and close button col_header, col_close = st.columns([4, 1]) with col_header: results_count = len(st.session_state.processing_results) st.subheader(f"📝 {'نتائج المعالجة' if st.session_state.language == 'ar' else 'Processing Results'} ({results_count})") with col_close: if st.button("❌ " + ("إخفاء" if st.session_state.language == 'ar' else "Hide"), key="hide_results"): st.session_state.show_processing_results = False st.rerun() if results_count > 1: # Show all results in one view option show_all = st.checkbox( "عرض جميع النتائج مجمعة" if st.session_state.language == 'ar' else "Show all results combined", help="عرض جميع النصوص والترجمات في مكان واحد" if st.session_state.language == 'ar' else "Display all texts and translations in one place" ) if show_all: # Combined view st.markdown("### " + ("النصوص الأصلية مجمعة" if st.session_state.language == 'ar' else "Combined Original Texts")) combined_original = "\n\n".join([result.get('original_text', '') for result in st.session_state.processing_results.values() if result.get('original_text')]) if combined_original: st.write(combined_original) if st.button("📋 " + ("نسخ جميع النصوص" if st.session_state.language == 'ar' else "Copy All Texts")): st.code(combined_original, language=None) st.markdown("### " + ("الترجمات مجمعة" if st.session_state.language == 'ar' else "Combined Translations")) combined_translation = "\n\n".join([result.get('translated_text', '') for result in st.session_state.processing_results.values() if result.get('translated_text')]) if combined_translation: st.write(combined_translation) if st.button("📋 " + ("نسخ جميع الترجمات" if st.session_state.language == 'ar' else "Copy All Translations")): st.code(combined_translation, language=None) st.markdown("---") # Show results for each completed processing for task_id, result in st.session_state.processing_results.items(): with st.expander(f"🎵 {'التسجيل' if st.session_state.language == 'ar' else 'Recording'} {task_id}", expanded=True): # Original text in white container if result.get('original_text'): original_title = "النص الأصلي" if st.session_state.language == 'ar' else "Original Text" original_container = create_white_container(original_title, result['original_text'], "📝") st.markdown(original_container, unsafe_allow_html=True) # Translation in white container if result.get('translated_text'): translation_title = "الترجمة" if st.session_state.language == 'ar' else "Translation" translation_container = create_white_container(translation_title, result['translated_text'], "🌐") st.markdown(translation_container, unsafe_allow_html=True) # Language info if result.get('detected_language'): st.caption(f"🌐 {'اللغة المكتشفة' if st.session_state.language == 'ar' else 'Detected language'}: {result['detected_language']}") # Action buttons col1, col2, col3 = st.columns(3) with col1: if st.button(f"📋 {'نسخ النص' if st.session_state.language == 'ar' else 'Copy Text'}", key=f"copy_original_{task_id}"): st.code(result.get('original_text', ''), language=None) st.success("✅ " + ("تم تنسيق النص للنسخ" if st.session_state.language == 'ar' else "Text formatted for copying")) with col2: if result.get('translated_text') and st.button(f"📋 {'نسخ الترجمة' if st.session_state.language == 'ar' else 'Copy Translation'}", key=f"copy_translation_{task_id}"): st.code(result.get('translated_text', ''), language=None) st.success("✅ " + ("تم تنسيق الترجمة للنسخ" if st.session_state.language == 'ar' else "Translation formatted for copying")) with col3: if st.button(f"🗑️ {'حذف' if st.session_state.language == 'ar' else 'Delete'}", key=f"delete_{task_id}"): del st.session_state.processing_results[task_id] if task_id in st.session_state.processing_status: del st.session_state.processing_status[task_id] st.rerun() # --- Step 1: Upload and Process --- def step_1_upload_and_process(): st.header("🎵 " + ("مصدر الصوت" if st.session_state.language == 'ar' else "Audio Source")) upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"]) with upload_tab: st.subheader("Upload an existing audio file") uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A") if uploaded_file: st.session_state.audio_data = uploaded_file.getvalue() st.success(f"File ready for processing: {uploaded_file.name}") st.audio(st.session_state.audio_data) if st.button("🚀 Start AI Processing", type="primary", use_container_width=True): run_audio_processing(st.session_state.audio_data, uploaded_file.name) if st.session_state.audio_data: if st.button("🔄 Use a Different File"): reset_session() st.rerun() with record_tab: st.subheader("Record audio directly from your microphone") # Recording instructions # Recording instructions with improved controls if st.session_state.language == 'ar': st.info("🎙️ **تحكم بسيط في التسجيل:**\n- اضغط الميكروفون لبدء التسجيل\n- اضغط مرة أخرى للتوقف\n- استخدم الأزرار أدناه للتحكم الإضافي") else: st.info("🎙️ **Simple Recording Controls:**\n- Click microphone to start recording\n- Click again to stop\n- Use buttons below for additional control") # Recording control buttons col_record_info, col_record_controls = st.columns([2, 1]) with col_record_info: # This will show recording status pass with col_record_controls: # Recording control buttons removed - now handled by the audio component itself pass # Recording status recording_status_placeholder = st.empty() # Use the audio recorder component wav_audio_data = st_audiorec() # Show recording status and controls if wav_audio_data: # Check if wav_audio_data is bytes or dict if isinstance(wav_audio_data, bytes): # Simple bytes data - show audio player recording_status_placeholder.success("🎵 " + ("تسجيل جاهز للمعالجة" if st.session_state.language == 'ar' else "Recording ready for processing")) # Recording controls col_play, col_clear = st.columns(2) with col_play: st.audio(wav_audio_data, format='audio/wav') with col_clear: if st.button("🗑️ " + ("مسح التسجيل" if st.session_state.language == 'ar' else "Clear Recording")): st.rerun() elif isinstance(wav_audio_data, dict): # Dict data - handle interval processing recording_status_placeholder.info("🔄 " + ("معالجة المقاطع..." if st.session_state.language == 'ar' else "Processing intervals...")) # Google Docs Export and Logout Buttons col_export, col_logout = st.columns([3, 1]) with col_export: export_button_text = "📤 تصدير إلى Google Docs" if st.session_state.language == 'ar' else "📤 Export to Google Docs" if st.button(export_button_text, type="primary", use_container_width=True): export_to_google_docs_directly() with col_logout: # Check if user is authenticated if google_docs_manager.is_authenticated(): logout_text = "🚪 خروج" if st.session_state.language == 'ar' else "🚪 Logout" if st.button(logout_text, use_container_width=True, help="تسجيل الخروج من Google" if st.session_state.language == 'ar' else "Logout from Google"): logout_from_google() else: # Show login status login_status = "غير متصل" if st.session_state.language == 'ar' else "Not logged in" st.caption(f"🔒 {login_status}") # Processing settings st.markdown("**" + ("إعدادات المعالجة" if st.session_state.language == 'ar' else "Processing Settings") + "**") # Auto-process toggle (changed default to False for better UX) st.session_state.setdefault('auto_process_snapshots', False) auto_process = st.checkbox( "معالجة تلقائية للمقاطع" if st.session_state.language == 'ar' else "Auto-process snapshots", key='auto_process_snapshots', help="عند التفعيل، يتم معالجة المقاطع تلقائياً أثناء التسجيل" if st.session_state.language == 'ar' else "When enabled, snapshots are processed automatically during recording" ) # Background processing toggle st.session_state.setdefault('background_processing', True) background_mode = st.checkbox( "معالجة في الخلفية" if st.session_state.language == 'ar' else "Background processing", key='background_processing', value=True, help="يسمح بالاستمرار في استخدام التطبيق أثناء المعالجة" if st.session_state.language == 'ar' else "Allows continued use of the app during processing" ) if wav_audio_data: # Two possible payload shapes: raw bytes array (legacy) or interval payload dict if isinstance(wav_audio_data, dict) and wav_audio_data.get('type') in ('interval_wav', 'no_new'): payload = wav_audio_data # Mark Custom interval flow active so Step 2 editor/style can be hidden st.session_state['_custom_active'] = True if payload['type'] == 'no_new': st.info("No new audio chunks yet.") elif payload['type'] == 'interval_wav': # Extract interval audio b = bytes(payload['bytes']) sr = int(payload.get('sr', 16000)) start_ms = int(payload['start_ms']) end_ms = int(payload['end_ms']) # Dedupe/trim logic if end_ms <= start_ms: st.warning("The received interval is empty.") else: # Prevent overlap with prior segment last_end = st.session_state.lastFetchedEnd_ms or 0 eff_start_ms = max(start_ms, last_end) if eff_start_ms < end_ms: # If there is overlap, trim the audio bytes accordingly (assumes WAV PCM16 mono header 44 bytes) try: delta_ms = eff_start_ms - start_ms if delta_ms > 0: if len(b) >= 44 and b[0:4] == b'RIFF' and b[8:12] == b'WAVE': bytes_per_sample = 2 # PCM16 mono drop_samples = int(sr * (delta_ms / 1000.0)) drop_bytes = drop_samples * bytes_per_sample data_size = int.from_bytes(b[40:44], 'little') if len(b) >= 44 else len(b) - 44 pcm = b[44:] if drop_bytes < len(pcm): pcm_trim = pcm[drop_bytes:] else: pcm_trim = b'' new_data_size = len(pcm_trim) # Rebuild header sizes header = bytearray(b[:44]) # ChunkSize at offset 4 = 36 + Subchunk2Size (36 + new_data_size).to_bytes(4, 'little') header[4:8] = (36 + new_data_size).to_bytes(4, 'little') # Subchunk2Size at offset 40 header[40:44] = new_data_size.to_bytes(4, 'little') b = bytes(header) + pcm_trim else: # Not a recognizable WAV header; keep as-is pass except Exception as _: pass # Compute checksum digest = hashlib.md5(b).hexdigest() # Skip if identical checksum and same window exists = any(s.get('checksum') == digest and s.get('start_ms') == eff_start_ms and s.get('end_ms') == end_ms for s in st.session_state.broadcast_segments) if not exists: # Show processing feedback during extraction extraction_message = "Extracting text from interval..." if st.session_state.language == 'en' else "جاري استخراج النص من الفترة الزمنية..." feedback_placeholder = show_processing_feedback(extraction_message, st.session_state.language) try: # Run standard pipeline to get text (no translation to keep it light) # Reuse run_audio_processing internals via a temp path with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tf: tf.write(b) tmp_path = tf.name try: processor = AUDIO_PROCESSOR_CLASS() word_timestamps, processor_logs, model_used = processor.get_word_timestamps(tmp_path) full_text = " ".join([d['word'] for d in word_timestamps]) if word_timestamps else "" # Fallback: if timestamps extraction yielded no words, try plain transcription if not full_text: plain_text, err, fallback_model = processor.transcribe_audio(tmp_path) if plain_text: full_text = plain_text.strip() model_used = fallback_model finally: if os.path.exists(tmp_path): os.unlink(tmp_path) # إزالة رسالة المعالجة feedback_placeholder.empty() except Exception as e: feedback_placeholder.empty() cleanup_processing_state() raise e # Append segment immediately with only the original text seg = { 'id': digest, 'recording_id': payload.get('session_id', 'local'), 'start_ms': eff_start_ms, 'end_ms': end_ms, 'checksum': digest, 'text': full_text, 'translations': {}, 'transcription_model': model_used, } st.session_state.broadcast_segments.append(seg) # Sort segments by start time (oldest first for internal storage) st.session_state.broadcast_segments.sort(key=lambda s: s.get('start_ms', 0)) # Debug log for segment addition if st.session_state.get('debug_mode', False): st.write(f"Debug: Added segment #{len(st.session_state.broadcast_segments)}: {eff_start_ms/1000:.1f}s-{end_ms/1000:.1f}s") st.session_state.lastFetchedEnd_ms = end_ms if full_text: if digest not in st.session_state.transcript_ids: st.session_state.transcript_ids.add(digest) st.session_state.transcript_feed.insert( 0, { "id": digest, "ts": int(time.time() * 1000), "text": full_text, }, ) st.session_state.edited_text = "\n\n".join( [s["text"] for s in st.session_state.transcript_feed] ) # Show immediate success message success_msg = f"✅ {'تم إضافة مقطع جديد' if st.session_state.language == 'ar' else 'Added new segment'}: {eff_start_ms/1000:.2f}s → {end_ms/1000:.2f}s" st.success(success_msg) # Force UI refresh to show the new segment immediately in the broadcast stack st.rerun() # Now, asynchronously update translation and summary after segment is added def update_translation_and_summary(): try: if full_text and st.session_state.get('enable_translation', True): translator = get_translator() sel_lang = st.session_state.get('broadcast_translation_lang', 'ar') tx, _ = translator.translate_text(full_text, target_language=sel_lang) if tx: seg['translations'][sel_lang] = tx except Exception: pass # Update summary if st.session_state.get('auto_generate_summary', True): try: source_text = " \n".join([s.get('text', '') for s in st.session_state.broadcast_segments if s.get('text')]) if source_text.strip(): summary, _ = generate_summary(source_text, target_language=st.session_state.get('summary_language', 'ar')) if summary: st.session_state.arabic_explanation = summary except Exception: pass import threading threading.Thread(target=update_translation_and_summary, daemon=True).start() else: st.info("Duplicate segment ignored.") else: st.info("No new parts after the last point.") else: # Legacy: treat as full wav bytes bytes_data = bytes(wav_audio_data) # This is not the Custom interval mode st.session_state['_custom_active'] = False st.session_state.audio_data = bytes_data st.audio(bytes_data) digest = hashlib.md5(bytes_data).hexdigest() last_digest = st.session_state.get('_last_component_digest') if st.session_state.auto_process_snapshots and digest != last_digest: st.session_state['_last_component_digest'] = digest task_id = run_audio_processing(bytes_data, "snapshot.wav") if task_id: st.success(f"🔄 {'تم إضافة المقطع للمعالجة' if st.session_state.language == 'ar' else 'Snapshot queued for processing'}") else: # Simple single button for processing if st.button("📝 " + ("استخراج النص" if st.session_state.language == 'ar' else "Extract Text"), type="primary", use_container_width=True): st.session_state['_last_component_digest'] = digest task_id = run_audio_processing(bytes_data, "recorded_audio.wav") if task_id: st.success(f"✅ {'تم إضافة التسجيل للمعالجة' if st.session_state.language == 'ar' else 'Audio queued for processing'}") # Simplified: removed external live slice server UI to avoid complexity # Always show Broadcast view in Step 1 as well (regardless of transcription_data) # Use a container that refreshes automatically when segments are added # Always show Broadcast view in Step 1 as well (regardless of transcription_data) with st.expander("📻 Broadcast (latest first)", expanded=True): # Language selector for broadcast translations try: translator = get_translator() langs = translator.get_supported_languages() codes = list(langs.keys()) labels = ["detect language — Arabic (العربية)"] + [f"{code} — {langs[code]}" for code in codes] current = st.session_state.get('broadcast_translation_lang', 'ar') # If not set, default to 'detect' if current not in codes and current != 'detect': current = 'detect' default_index = 0 if current == 'detect' else (codes.index(current) + 1 if current in codes else 1) sel_label = st.selectbox("Broadcast translation language", labels, index=default_index) if sel_label.startswith("detect language"): sel_code = 'detect' else: sel_code = sel_label.split(' — ')[0] st.session_state.broadcast_translation_lang = sel_code except Exception: sel_code = st.session_state.get('broadcast_translation_lang', 'ar') if st.session_state.broadcast_segments: # Show all segments (no pagination for broadcast view) total_segments = len(st.session_state.broadcast_segments) # Ensure we have valid segments with required fields valid_segments = [s for s in st.session_state.broadcast_segments if s.get('text') and s.get('start_ms') is not None] if len(valid_segments) != total_segments: st.warning(f"Found {total_segments - len(valid_segments)} invalid segments. Showing {len(valid_segments)} valid segments.") total_segments = len(valid_segments) st.session_state.broadcast_segments = valid_segments # Show total count with better styling st.markdown(f"""
📻 {'البث المباشر' if st.session_state.language == 'ar' else 'Live Broadcast'} • {total_segments} {'مقطع' if st.session_state.language == 'ar' else 'segments'}
""", unsafe_allow_html=True) # Show all segments (newest first) - ensure fresh sorting every time sorted_segments = sorted(st.session_state.broadcast_segments, key=lambda s: s.get('start_ms', 0), reverse=True) for idx, s in enumerate(sorted_segments, 1): # Create unique segment ID segment_id = s.get('id', f"seg_{s['start_ms']}_{s['end_ms']}") # Original text with selection capability original_text = s.get('text', '') if original_text: # Check if this segment is selected is_selected = (st.session_state.selected_segment_id == segment_id) # Create timestamp for bubble with segment number timestamp = f"#{idx} • {s['start_ms']/1000:.1f}s → {s['end_ms']/1000:.1f}s" # Create columns for bubble and ask button col_bubble, col_ask = st.columns([5, 1]) with col_bubble: # Display text as chat bubble bubble_html = create_broadcast_bubble(original_text, timestamp, is_selected) st.markdown(bubble_html, unsafe_allow_html=True) if is_selected: st.success("🔍 " + ("هذا النص محدد للأسئلة" if st.session_state.language == 'ar' else "This text is selected for questions")) with col_ask: # Ask AI button ask_button_text = "🤖 اسأل" if st.session_state.language == 'ar' else "🤖 Ask" button_type = "primary" if not is_selected else "secondary" if st.button(ask_button_text, key=f"ask_{segment_id}", type=button_type, use_container_width=True, help="اسأل الذكاء الاصطناعي عن هذا النص" if st.session_state.language == 'ar' else "Ask AI about this text"): # Select this text and open question modal st.session_state.selected_text = original_text st.session_state.selected_segment_id = segment_id st.session_state.show_question_modal = True st.rerun() # Show model used for transcription model_note = s.get('transcription_model', None) if model_note: st.caption(f"Model used: {model_note}") # Ensure and show translation in selected language if s.get('text') and st.session_state.get('enable_translation', True): if 'translations' not in s or not isinstance(s.get('translations'), dict): s['translations'] = {} # Detect language and translate if 'detect' is selected if sel_code == 'detect': # Use detected language from segment if available, else fallback to 'ar' detected_lang = s.get('detected_language', None) target_lang = 'ar' # Always translate to Arabic in detect mode if target_lang not in s['translations']: try: tx, _ = get_translator().translate_text(s.get('text', ''), target_language=target_lang) if tx: s['translations'][target_lang] = tx except Exception: pass if s['translations'].get(target_lang): st.caption(f"Translation (AR):") st.write(s['translations'][target_lang]) else: if sel_code not in s['translations']: try: tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code) if tx: s['translations'][sel_code] = tx except Exception: pass if s['translations'].get(sel_code): st.caption(f"Translation ({sel_code.upper()}):") st.write(s['translations'][sel_code]) st.divider() else: st.caption("No segments yet. Use the Custom button while recording.") # --- Google Logout Function --- def logout_from_google(): """Logout from Google account""" try: success = google_docs_manager.logout() if success: st.success("تم تسجيل الخروج بنجاح!" if st.session_state.language == 'ar' else "Successfully logged out!") st.info("يمكنك الآن تسجيل الدخول بحساب آخر" if st.session_state.language == 'ar' else "You can now login with a different account") # Force rerun to update UI time.sleep(1) st.rerun() else: st.error("خطأ في تسجيل الخروج" if st.session_state.language == 'ar' else "Error during logout") except Exception as e: st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}") # --- Direct Google Docs Export Function --- def export_to_google_docs_directly(): """Export broadcast segments directly to Google Docs without conditions""" try: # Get current segments (all segments, not filtered by timestamp) segments = st.session_state.broadcast_segments or [] # Show progress using custom feedback export_message = "جاري التصدير إلى Google Docs..." if st.session_state.language == 'ar' else "Exporting to Google Docs..." feedback_placeholder = show_processing_feedback(export_message, st.session_state.language) try: # Export directly doc_url, error = google_docs_manager.export_broadcast_to_docs( segments, ui_language=st.session_state.language ) # إزالة رسالة المعالجة feedback_placeholder.empty() except Exception as e: feedback_placeholder.empty() cleanup_processing_state() raise e if doc_url and not error: st.success("تم إنشاء المستند بنجاح!" if st.session_state.language == 'ar' else "Document created successfully!") # Show clickable link if st.session_state.language == 'ar': st.markdown(f"🔗 [فتح المستند في Google Docs]({doc_url})") st.info("💡 نصيحة: اضغط على الرابط أعلاه لفتح المستند في تبويب جديد") else: st.markdown(f"🔗 [Open Document in Google Docs]({doc_url})") st.info("💡 Tip: Click the link above to open the document in a new tab") # Also show the URL for copying st.code(doc_url, language=None) # Show current user info if google_docs_manager.is_authenticated(): st.caption("✅ متصل بحساب Google" if st.session_state.language == 'ar' else "✅ Connected to Google account") else: error_msg = error or "Unknown error occurred" st.error(f"خطأ في التصدير: {error_msg}" if st.session_state.language == 'ar' else f"Export error: {error_msg}") # Show setup instructions if credentials are missing if "credentials" in error_msg.lower() or "authentication" in error_msg.lower(): st.info("📋 يرجى مراجعة ملف GOOGLE_SETUP.md لإعداد Google Docs" if st.session_state.language == 'ar' else "📋 Please check GOOGLE_SETUP.md for Google Docs setup instructions") except Exception as e: st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}") # --- AI Question Modal Function --- def show_question_modal(): """Display AI question modal for selected text""" if not st.session_state.selected_text: st.session_state.show_question_modal = False return # Get AI question engine question_engine = get_ai_question_engine() # Modal header st.subheader("🤖 اسأل الذكاء الاصطناعي" if st.session_state.language == 'ar' else "🤖 Ask AI") # Show selected text with st.expander("النص المحدد" if st.session_state.language == 'ar' else "Selected Text", expanded=True): st.write(f"📝 {st.session_state.selected_text}") # Show conversation history if exists if st.session_state.current_question_session: session = question_engine.get_conversation_history(st.session_state.current_question_session) if session and session.conversation: with st.expander(f"💬 تاريخ المحادثة ({len(session.conversation)} أسئلة)" if st.session_state.language == 'ar' else f"💬 Conversation History ({len(session.conversation)} questions)", expanded=False): for i, qa in enumerate(session.conversation, 1): st.markdown(f"**{i}. {qa.question}**") st.write(qa.answer) # Show timing and model info model_info = getattr(qa, 'model_used', 'Unknown') model_color = "green" if "Gemini" in model_info else "orange" if "Groq" in model_info or "OpenRouter" in model_info else "red" caption_text = f"⏱️ {qa.timestamp.strftime('%H:%M:%S')} - {qa.response_time_ms}ms" model_text = f"🔧 {model_info}" st.caption(caption_text) st.markdown(f"{model_text}", unsafe_allow_html=True) if i < len(session.conversation): st.divider() # Model selection (moved to top) st.markdown("**" + ("اختيار النموذج" if st.session_state.language == 'ar' else "Model Selection") + "**") # Get model availability models_status = question_engine.check_model_availability() # Create model options with status indicators model_options = {} for model_name, status_info in models_status.items(): display_name = f"{status_info['icon']} {model_name} - {status_info['message']}" model_options[display_name] = model_name # Add auto option auto_text = "🔄 تلقائي (أفضل نموذج متاح)" if st.session_state.language == 'ar' else "🔄 Auto (Best available model)" model_options = {auto_text: 'auto', **model_options} # Find current selection index current_model = st.session_state.get('preferred_ai_model', 'auto') current_index = 0 for i, (display_name, model_name) in enumerate(model_options.items()): if model_name == current_model: current_index = i break # Model selector selected_model_display = st.selectbox( "النموذج المفضل" if st.session_state.language == 'ar' else "Preferred Model", options=list(model_options.keys()), index=current_index, help="اختر النموذج المفضل للإجابة" if st.session_state.language == 'ar' else "Choose preferred model for answering" ) selected_model = model_options[selected_model_display] st.session_state.preferred_ai_model = selected_model # Show model status details if selected_model != 'auto': status_info = models_status[selected_model] if status_info['status'] != 'available': if status_info['status'] == 'quota_exceeded': st.warning("⚠️ هذا النموذج استنفد حصته اليومية" if st.session_state.language == 'ar' else "⚠️ This model has exceeded its daily quota") elif status_info['status'] == 'not_configured': st.info("ℹ️ هذا النموذج غير مُعد - سيتم استخدام البديل" if st.session_state.language == 'ar' else "ℹ️ This model is not configured - fallback will be used") else: st.error("❌ هذا النموذج غير متاح حالياً" if st.session_state.language == 'ar' else "❌ This model is currently unavailable") # Language selection for answers st.markdown("**" + ("لغة الإجابة" if st.session_state.language == 'ar' else "Answer Language") + "**") # Language options language_options = { "🔄 تلقائي (حسب لغة الواجهة)" if st.session_state.language == 'ar' else "🔄 Auto (Interface language)": 'auto', "🇸🇦 العربية": 'ar', "🇺🇸 English": 'en', "🇫🇷 Français": 'fr', "🇪🇸 Español": 'es', "🇩🇪 Deutsch": 'de', "🇨🇳 中文": 'zh' } # Find current language selection current_lang = st.session_state.get('preferred_answer_language', 'auto') current_lang_index = 0 for i, (display_name, lang_code) in enumerate(language_options.items()): if lang_code == current_lang: current_lang_index = i break # Language selector selected_language_display = st.selectbox( "لغة الإجابة المفضلة" if st.session_state.language == 'ar' else "Preferred Answer Language", options=list(language_options.keys()), index=current_lang_index, help="اختر اللغة التي تريد الحصول على الإجابة بها" if st.session_state.language == 'ar' else "Choose the language for AI responses" ) selected_answer_language = language_options[selected_language_display] st.session_state.preferred_answer_language = selected_answer_language # Show language info if selected_answer_language == 'auto': current_ui_lang = "العربية" if st.session_state.language == 'ar' else "English" st.caption(f"ℹ️ سيتم استخدام لغة الواجهة الحالية: {current_ui_lang}" if st.session_state.language == 'ar' else f"ℹ️ Will use current interface language: {current_ui_lang}") else: lang_names = {'ar': 'العربية', 'en': 'English', 'fr': 'Français', 'es': 'Español', 'de': 'Deutsch', 'zh': '中文'} selected_lang_name = lang_names.get(selected_answer_language, selected_answer_language) st.caption(f"ℹ️ الإجابات ستكون باللغة: {selected_lang_name}" if st.session_state.language == 'ar' else f"ℹ️ Answers will be in: {selected_lang_name}") # Question templates st.markdown("**" + ("قوالب الأسئلة السريعة" if st.session_state.language == 'ar' else "Quick Question Templates") + "**") templates = question_engine.get_question_templates(st.session_state.language) # Display templates as buttons in columns cols = st.columns(2) for i, template in enumerate(templates[:6]): # Show first 6 templates with cols[i % 2]: if st.button(template, key=f"template_{i}", use_container_width=True): # Process template question process_ai_question(template, is_template=True, preferred_model=selected_model, answer_language=selected_answer_language) return # Custom question input st.markdown("**" + ("أو اكتب سؤالك الخاص" if st.session_state.language == 'ar' else "Or Write Your Own Question") + "**") custom_question = st.text_area( "سؤالك" if st.session_state.language == 'ar' else "Your Question", placeholder="اكتب سؤالك هنا..." if st.session_state.language == 'ar' else "Type your question here...", height=100 ) # Action buttons col_ask, col_cancel = st.columns(2) with col_ask: if st.button("🚀 اسأل" if st.session_state.language == 'ar' else "🚀 Ask", type="primary", disabled=not custom_question.strip()): if custom_question.strip(): process_ai_question(custom_question.strip(), is_template=False, preferred_model=selected_model, answer_language=selected_answer_language) return with col_cancel: if st.button("❌ إلغاء" if st.session_state.language == 'ar' else "❌ Cancel"): st.session_state.show_question_modal = False st.session_state.selected_text = None st.session_state.selected_segment_id = None st.rerun() def process_ai_question(question: str, is_template: bool = False, preferred_model: str = 'auto', answer_language: str = 'auto'): """Process AI question and show response""" question_engine = get_ai_question_engine() # Prepare segment info segment_info = { 'id': st.session_state.selected_segment_id, 'start_ms': 0, # We'll get this from the actual segment if needed 'end_ms': 0 } # Show processing indicator using custom feedback processing_message = "جاري معالجة سؤالك..." if st.session_state.language == 'ar' else "Processing your question..." feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) try: # Determine answer language if answer_language == 'auto': answer_lang = st.session_state.language else: answer_lang = answer_language # Process question result = question_engine.process_question( selected_text=st.session_state.selected_text, question=question, segment_info=segment_info, ui_language=answer_lang, # Use selected answer language session_id=st.session_state.current_question_session, preferred_model=preferred_model ) # Handle different return formats for backward compatibility if len(result) == 4: answer, error, session_id, model_used = result else: answer, error, session_id = result model_used = "Unknown" # إزالة رسالة المعالجة feedback_placeholder.empty() except Exception as e: feedback_placeholder.empty() cleanup_processing_state() raise e # Update session ID st.session_state.current_question_session = session_id if answer: # Check response type and model fallback is_simple_response = "ملاحظة: هذه إجابة مبسطة" in answer or "Note: This is a simplified response" in answer preferred_model = st.session_state.get('preferred_ai_model', 'auto') model_fallback = preferred_model != 'auto' and preferred_model != model_used if is_simple_response: st.warning("⚠️ خدمة الذكاء الاصطناعي غير متاحة حالياً - إجابة مبسطة" if st.session_state.language == 'ar' else "⚠️ AI service temporarily unavailable - simplified response") elif model_fallback: st.info(f"ℹ️ النموذج المفضل ({preferred_model}) غير متاح - تم استخدام {model_used}" if st.session_state.language == 'ar' else f"ℹ️ Preferred model ({preferred_model}) unavailable - used {model_used}") else: st.success("تم الحصول على الإجابة!" if st.session_state.language == 'ar' else "Got the answer!") # Display Q&A st.markdown("### " + ("السؤال" if st.session_state.language == 'ar' else "Question")) st.write(f"❓ {question}") st.markdown("### " + ("الإجابة" if st.session_state.language == 'ar' else "Answer")) st.write(f"🤖 {answer}") # Show which model was used with enhanced styling if model_used: # Get model status for better display question_engine = get_ai_question_engine() models_status = question_engine.check_model_availability() model_info = models_status.get(model_used, {}) icon = model_info.get('icon', '🤖') color = model_info.get('color', 'gray') # Show if user's preferred model was used or fallback occurred preferred_model = st.session_state.get('preferred_ai_model', 'auto') if preferred_model != 'auto' and preferred_model != model_used: fallback_msg = " (تم التبديل للبديل)" if st.session_state.language == 'ar' else " (fallback used)" color = "orange" else: fallback_msg = "" model_display = f"{icon} {model_used}{fallback_msg}" # Show answer language info answer_lang = st.session_state.get('preferred_answer_language', 'auto') if answer_lang == 'auto': lang_display = "تلقائي" if st.session_state.language == 'ar' else "Auto" lang_flag = "🔄" else: lang_flags = {'ar': '🇸🇦', 'en': '🇺🇸', 'fr': '🇫🇷', 'es': '🇪🇸', 'de': '🇩🇪', 'zh': '🇨🇳'} lang_names = {'ar': 'العربية', 'en': 'English', 'fr': 'Français', 'es': 'Español', 'de': 'Deutsch', 'zh': '中文'} lang_flag = lang_flags.get(answer_lang, '🌐') lang_display = lang_names.get(answer_lang, answer_lang) info_text = f"🔧 {'النموذج' if st.session_state.language == 'ar' else 'Model'}: {model_display} | 🌐 {'اللغة' if st.session_state.language == 'ar' else 'Language'}: {lang_flag} {lang_display}" st.markdown(f"
{info_text}
", unsafe_allow_html=True) # Show additional help for simple responses if is_simple_response: with st.expander("💡 نصائح للحصول على إجابات أفضل" if st.session_state.language == 'ar' else "💡 Tips for better answers"): if st.session_state.language == 'ar': st.markdown(""" **لماذا الإجابة مبسطة؟** - تم استنفاد الحد اليومي لخدمة Gemini AI المجانية (50 طلب/يوم) - النظام يستخدم إجابات مبسطة كبديل مؤقت **للحصول على إجابات أفضل:** - حاول مرة أخرى غداً (يتم تجديد الحد اليومي) - اطرح أسئلة أكثر تحديداً - ابحث في مصادر إضافية للموضوع """) else: st.markdown(""" **Why is the answer simplified?** - Daily limit for free Gemini AI service exceeded (50 requests/day) - System is using simplified responses as temporary fallback **For better answers:** - Try again tomorrow (daily limit resets) - Ask more specific questions - Search additional sources for the topic """) # Action buttons for the response col_copy, col_follow, col_close = st.columns(3) with col_copy: if st.button("📋 نسخ" if st.session_state.language == 'ar' else "📋 Copy"): # Format for copying copy_text = f"السؤال: {question}\nالإجابة: {answer}" if st.session_state.language == 'ar' else f"Question: {question}\nAnswer: {answer}" st.code(copy_text, language=None) st.success("تم تنسيق النص للنسخ أعلاه" if st.session_state.language == 'ar' else "Text formatted for copying above") with col_follow: if st.button("➕ سؤال متابعة" if st.session_state.language == 'ar' else "➕ Follow-up"): # Keep modal open for follow-up question st.rerun() with col_close: if st.button("✅ إغلاق" if st.session_state.language == 'ar' else "✅ Close"): st.session_state.show_question_modal = False st.session_state.selected_text = None st.session_state.selected_segment_id = None st.rerun() else: # Show error st.error(f"خطأ: {error}" if st.session_state.language == 'ar' else f"Error: {error}") # Retry and close buttons col_retry, col_close = st.columns(2) with col_retry: if st.button("🔄 إعادة المحاولة" if st.session_state.language == 'ar' else "🔄 Retry"): process_ai_question(question, is_template) return with col_close: if st.button("❌ إغلاق" if st.session_state.language == 'ar' else "❌ Close"): st.session_state.show_question_modal = False st.session_state.selected_text = None st.session_state.selected_segment_id = None st.rerun() # --- Export Modal Function --- def show_export_modal(): """Display export modal with preview and options""" # Initialize Google Docs auth if 'google_auth' not in st.session_state: st.session_state.google_auth = GoogleDocsAuth() google_auth = st.session_state.google_auth # Filter segments from export timestamp if not st.session_state.export_timestamp or not st.session_state.broadcast_segments: st.session_state.show_export_modal = False return # Get segments after export timestamp filtered_segments = [] for segment in st.session_state.broadcast_segments: if segment.get('start_ms', 0) >= st.session_state.export_timestamp: filtered_segments.append(segment) # Sort by start time (oldest first for export) filtered_segments.sort(key=lambda s: s.get('start_ms', 0)) if not filtered_segments: st.warning("لا توجد مقاطع جديدة للتصدير منذ الضغط على الزر" if st.session_state.language == 'ar' else "No new segments to export since button press") if st.button("إغلاق" if st.session_state.language == 'ar' else "Close"): st.session_state.show_export_modal = False st.rerun() return # Export preview st.subheader("📋 معاينة التصدير" if st.session_state.language == 'ar' else "📋 Export Preview") export_time = datetime.fromtimestamp(st.session_state.export_timestamp / 1000) st.info(f"{'المقاطع من وقت' if st.session_state.language == 'ar' else 'Segments from'}: {export_time.strftime('%H:%M:%S')}") st.info(f"{'عدد المقاطع' if st.session_state.language == 'ar' else 'Number of segments'}: {len(filtered_segments)}") # Show preview of segments with st.expander("معاينة المحتوى" if st.session_state.language == 'ar' else "Content Preview", expanded=False): for i, segment in enumerate(filtered_segments[:3]): # Show first 3 segments start_time = segment.get('start_ms', 0) / 1000 end_time = segment.get('end_ms', 0) / 1000 st.markdown(f"**[{start_time:.2f}s → {end_time:.2f}s]**") st.write(segment.get('text', '')[:100] + "..." if len(segment.get('text', '')) > 100 else segment.get('text', '')) if i < 2 and i < len(filtered_segments) - 1: st.divider() if len(filtered_segments) > 3: st.caption(f"... {'و' if st.session_state.language == 'ar' else 'and'} {len(filtered_segments) - 3} {'مقاطع أخرى' if st.session_state.language == 'ar' else 'more segments'}") # Export options col1, col2 = st.columns(2) with col1: format_options = { "📄 Word Document": "word", "📝 Google Docs": "google_docs" } selected_format = st.selectbox( "تنسيق التصدير" if st.session_state.language == 'ar' else "Export Format", options=list(format_options.keys()), index=0 ) st.session_state.export_format = format_options[selected_format] with col2: include_summary = st.checkbox( "تضمين الملخص" if st.session_state.language == 'ar' else "Include Summary", value=True ) # Google Docs authentication section if st.session_state.export_format == 'google_docs': st.markdown("---") if google_auth.is_authenticated(): st.success("✅ " + ("متصل بـ Google Docs" if st.session_state.language == 'ar' else "Connected to Google Docs")) col_logout, col_info = st.columns([1, 2]) with col_logout: if st.button("🚪 " + ("تسجيل خروج" if st.session_state.language == 'ar' else "Logout")): google_auth.logout() st.rerun() with col_info: st.caption("سيتم إنشاء المستند في حسابك على Google" if st.session_state.language == 'ar' else "Document will be created in your Google account") else: st.warning("🔐 " + ("يجب تسجيل الدخول إلى Google Docs أولاً" if st.session_state.language == 'ar' else "Please authenticate with Google Docs first")) # Handle OAuth callback auth_code = st.query_params.get("code") if auth_code: success, message = google_auth.handle_auth_callback(auth_code) if success: st.success(message) # Clear the code from URL st.query_params.clear() st.rerun() else: st.error(message) # Show authentication button auth_url, error = google_auth.get_auth_url() if auth_url: st.markdown(f""" """, unsafe_allow_html=True) st.caption("سيتم فتح نافذة جديدة للمصادقة" if st.session_state.language == 'ar' else "A new window will open for authentication") else: st.error(f"خطأ في إعداد Google API: {error}" if st.session_state.language == 'ar' else f"Google API setup error: {error}") st.info("يرجى إعداد GOOGLE_CLIENT_ID و GOOGLE_CLIENT_SECRET في متغيرات البيئة" if st.session_state.language == 'ar' else "Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables") # Export buttons col_export, col_cancel = st.columns(2) with col_export: # Disable export button if Google Docs is selected but not authenticated export_disabled = (st.session_state.export_format == 'google_docs' and not google_auth.is_authenticated()) export_button_text = "🚀 تصدير" if st.session_state.language == 'ar' else "🚀 Export" if st.button(export_button_text, type="primary", disabled=export_disabled): perform_export(filtered_segments, include_summary, google_auth) with col_cancel: if st.button("❌ إلغاء" if st.session_state.language == 'ar' else "❌ Cancel"): st.session_state.show_export_modal = False st.rerun() # --- Export Execution Function --- def perform_export(segments, include_summary=True, google_auth=None): """Perform the actual export operation""" try: # Initialize exporter translator = get_translator() exporter = BroadcastExporter(translator) # Create export configuration config = ExportConfig( export_timestamp=st.session_state.export_timestamp, format_type=st.session_state.export_format, include_summary=include_summary, ui_language=st.session_state.language, target_language=st.session_state.get('broadcast_translation_lang', 'ar') ) # Prepare export content content = exporter.prepare_export_content(segments, config) # Show progress using custom feedback export_message = "جاري التصدير..." if st.session_state.language == 'ar' else "Exporting..." feedback_placeholder = show_processing_feedback(export_message, st.session_state.language) try: # Perform export with fallback result, error = exporter.export_with_fallback(content, config, google_auth) # إزالة رسالة المعالجة feedback_placeholder.empty() except Exception as e: feedback_placeholder.empty() cleanup_processing_state() raise e if result and not error: if config.format_type == 'word': # Provide download link for Word document with open(result, 'rb') as file: st.download_button( label="📥 تحميل الملف" if st.session_state.language == 'ar' else "📥 Download File", data=file.read(), file_name=os.path.basename(result), mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) st.success("تم إنشاء الملف بنجاح!" if st.session_state.language == 'ar' else "File created successfully!") else: # Google Docs URL st.success("تم إنشاء المستند بنجاح!" if st.session_state.language == 'ar' else "Document created successfully!") st.markdown(f"[فتح في Google Docs]({result})" if st.session_state.language == 'ar' else f"[Open in Google Docs]({result})") else: st.error(f"خطأ في التصدير: {error}" if st.session_state.language == 'ar' else f"Export error: {error}") except Exception as e: st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}") # Close modal after export attempt if st.button("إغلاق" if st.session_state.language == 'ar' else "Close"): st.session_state.show_export_modal = False st.rerun() # Note: external live slice helper removed to keep the app simple and fully local # --- Step 2: Review and Customize (REMOVED) --- # This section was removed as requested by user to simplify the interface # Results are now shown directly in show_processing_results() function def reset_session(): """Resets the session state by clearing specific keys and re-initializing.""" log_to_browser_console("--- INFO: Resetting session state. ---") keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording'] for key in keys_to_clear: if key in st.session_state: del st.session_state[key] initialize_session_state() # --- Entry Point --- if __name__ == "__main__": if check_api_key(): initialize_session_state() main()