Spaces:
Sleeping
Sleeping
| # app.py - Refactored to eliminate recorder_server.py dependency | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| import time | |
| import traceback | |
| import streamlit.components.v1 as components | |
| import hashlib | |
| from datetime import datetime | |
| # from st_audiorec import st_audiorec # Import the new recorder component - OLD | |
| # Reduce metrics/usage writes that can cause permission errors on hosted environments | |
| try: | |
| st.set_option('browser.gatherUsageStats', False) | |
| except Exception: | |
| pass | |
| # Robust component declaration: prefer local build, else fall back to pip package | |
| parent_dir = os.path.dirname(os.path.abspath(__file__)) | |
| build_dir = os.path.join(parent_dir, "custom_components/st-audiorec/st_audiorec/frontend/build") | |
| def st_audiorec(key=None): | |
| """Return audio recorder component value, trying local build first, then pip package fallback.""" | |
| try: | |
| if os.path.isdir(build_dir): | |
| _component_func = components.declare_component("st_audiorec", path=build_dir) | |
| return _component_func(key=key, default=0) | |
| # Fallback to pip-installed component if available | |
| try: | |
| from st_audiorec import st_audiorec as st_audiorec_pkg | |
| return st_audiorec_pkg(key=key) | |
| except Exception: | |
| st.warning("Audio recorder component is unavailable on this deployment (missing local build and pip fallback).") | |
| return None | |
| except Exception: | |
| # Final safety net | |
| st.warning("Failed to initialize audio recorder component.") | |
| return None | |
| # --- Critical Imports and Initial Checks --- | |
| AUDIO_PROCESSOR_CLASS = None | |
| IMPORT_ERROR_TRACEBACK = None | |
| try: | |
| from audio_processor import AudioProcessor | |
| AUDIO_PROCESSOR_CLASS = AudioProcessor | |
| except Exception: | |
| IMPORT_ERROR_TRACEBACK = traceback.format_exc() | |
| from video_generator import VideoGenerator | |
| from mp3_embedder import MP3Embedder | |
| from utils import format_timestamp | |
| from translator import get_translator, UI_TRANSLATIONS | |
| from exporter import BroadcastExporter, ExportConfig | |
| from google_docs_config import google_docs_manager | |
| from ai_questions import get_ai_question_engine, TextSelection | |
| from style_fixes import apply_custom_styling, create_broadcast_bubble, create_white_container, create_processing_result_container | |
| import requests | |
| from dotenv import load_dotenv | |
| # --- API Key Check --- | |
| def check_api_key(): | |
| """Check for Gemini API key and display instructions if not found.""" | |
| load_dotenv() | |
| if not os.getenv("GEMINI_API_KEY"): | |
| st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!") | |
| st.info("To fix this, please follow these steps:") | |
| st.markdown(""" | |
| 1. **Find the file named `.env.example`** in the `syncmaster2` directory. | |
| 2. **Rename it to `.env`**. | |
| 3. **Open the `.env` file** with a text editor. | |
| 4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey). | |
| 5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`. | |
| 6. **Save the file and restart the application.** | |
| """) | |
| return False | |
| return True | |
| # --- Summary Helper (robust to cached translator without summarize_text) --- | |
| def generate_summary(text: str, target_language: str = 'ar'): | |
| """Generate a concise summary in target_language, with graceful fallback. | |
| If summarize_text is unavailable (cached instance), fall back to Arabic summary | |
| then translate to the target language if needed. | |
| """ | |
| tr = get_translator() | |
| try: | |
| if hasattr(tr, 'summarize_text') and callable(getattr(tr, 'summarize_text')): | |
| s, err = tr.summarize_text(text or '', target_language=target_language) | |
| if s: | |
| return s, None | |
| # Fallback path: Arabic summary first | |
| s_ar, err_ar = tr.summarize_text_arabic(text or '') | |
| if target_language and target_language != 'ar' and s_ar: | |
| tx, err_tx = tr.translate_text(s_ar, target_language=target_language) | |
| if tx: | |
| return tx, None | |
| return s_ar, err_tx | |
| return s_ar, err_ar | |
| except Exception as e: | |
| return None, str(e) | |
| # --- Custom Feedback Functions (No Dimming Effect) --- | |
| def show_processing_feedback(message: str, language: str = 'en'): | |
| """عرض تغذية راجعة للمعالجة بدون تعتيم الصفحة""" | |
| if language == 'ar': | |
| feedback_message = f"🔄 {message}" | |
| else: | |
| feedback_message = f"🔄 {message}" | |
| # استخدام st.info بدلاً من st.spinner لتجنب تأثير التعتيم | |
| info_placeholder = st.info(feedback_message) | |
| return info_placeholder | |
| def show_status_update(message: str, status_type: str = 'info', language: str = 'en'): | |
| """عرض تحديث الحالة بدون تعتيم الصفحة""" | |
| # إضافة الرموز التعبيرية المناسبة | |
| if status_type == 'success': | |
| icon = "✅" | |
| st.success(f"{icon} {message}") | |
| elif status_type == 'error': | |
| icon = "❌" | |
| st.error(f"{icon} {message}") | |
| elif status_type == 'warning': | |
| icon = "⚠️" | |
| st.warning(f"{icon} {message}") | |
| else: | |
| icon = "ℹ️" | |
| st.info(f"{icon} {message}") | |
| def cleanup_processing_state(): | |
| """تنظيف حالة المعالجة عند حدوث خطأ""" | |
| if 'processing_state' in st.session_state: | |
| st.session_state.processing_state = { | |
| 'is_processing': False, | |
| 'current_task': '', | |
| 'progress': 0.0, | |
| 'message': '', | |
| 'language': st.session_state.get('language', 'en') | |
| } | |
| # تنظيف حالات المعالجة الأخرى | |
| if 'processing_status' in st.session_state: | |
| for task_id in list(st.session_state.processing_status.keys()): | |
| if st.session_state.processing_status[task_id] in ['processing', 'queued']: | |
| st.session_state.processing_status[task_id] = 'failed' | |
| def show_processing_progress(message: str, progress: float = None, language: str = 'en'): | |
| """عرض تقدم المعالجة مع شريط التقدم بدون تعتيم""" | |
| if language == 'ar': | |
| feedback_message = f"🔄 {message}" | |
| else: | |
| feedback_message = f"🔄 {message}" | |
| st.info(feedback_message) | |
| if progress is not None: | |
| progress_bar = st.progress(progress) | |
| return progress_bar | |
| else: | |
| # شريط تقدم غير محدد | |
| progress_bar = st.progress(0) | |
| return progress_bar | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="SyncMaster - AI Audio-Text Synchronization", | |
| page_icon="🎵", | |
| layout="wide" | |
| ) | |
| # --- Browser Console Logging Utility --- | |
| def log_to_browser_console(messages): | |
| """Injects JavaScript to log messages to the browser's console.""" | |
| if isinstance(messages, str): | |
| messages = [messages] | |
| escaped_messages = [json.dumps(str(msg)) for msg in messages] | |
| js_code = f""" | |
| <script> | |
| (function() {{ | |
| const logs = [{', '.join(escaped_messages)}]; | |
| console.group("Backend Logs from SyncMaster"); | |
| logs.forEach(log => {{ | |
| const content = String(log); | |
| if (content.includes('--- ERROR') || content.includes('--- FATAL')) {{ | |
| console.error(log); | |
| }} else if (content.includes('--- WARNING')) {{ | |
| console.warn(log); | |
| }} else if (content.includes('--- DEBUG')) {{ | |
| console.debug(log); | |
| }} else {{ | |
| console.log(log); | |
| }} | |
| }}); | |
| console.groupEnd(); | |
| }})(); | |
| </script> | |
| """ | |
| components.html(js_code, height=0, scrolling=False) | |
| # --- AI Models Reset Function --- | |
| def reset_ai_models(): | |
| """Reset all AI models and clear cache completely""" | |
| # Clear ALL session state keys that might contain cached AI instances | |
| keys_to_clear = [] | |
| for key in list(st.session_state.keys()): | |
| if any(term in key.lower() for term in ['translator', 'ai', 'question', 'model', 'engine', 'processing']): | |
| keys_to_clear.append(key) | |
| for key in keys_to_clear: | |
| del st.session_state[key] | |
| # Force reload environment variables | |
| load_dotenv(override=True) | |
| # Clear Python module cache completely | |
| import sys | |
| import importlib | |
| modules_to_reload = ['translator', 'ai_questions', 'exporter'] | |
| for module_name in modules_to_reload: | |
| if module_name in sys.modules: | |
| try: | |
| # Delete from sys.modules first | |
| del sys.modules[module_name] | |
| except: | |
| pass | |
| # Clear any global instances | |
| try: | |
| import translator | |
| if hasattr(translator, 'translator_instance'): | |
| translator.translator_instance = None | |
| except: | |
| pass | |
| try: | |
| import ai_questions | |
| if hasattr(ai_questions, 'ai_question_engine'): | |
| ai_questions.ai_question_engine = None | |
| except: | |
| pass | |
| # --- Session State Initialization --- | |
| def initialize_session_state(): | |
| """Initializes the session state variables if they don't exist.""" | |
| if 'step' not in st.session_state: | |
| st.session_state.step = 1 | |
| if 'audio_data' not in st.session_state: | |
| st.session_state.audio_data = None | |
| if 'language' not in st.session_state: | |
| st.session_state.language = 'en' | |
| if 'enable_translation' not in st.session_state: | |
| st.session_state.enable_translation = True | |
| if 'target_language' not in st.session_state: | |
| st.session_state.target_language = 'ar' | |
| if 'transcription_data' not in st.session_state: | |
| st.session_state.transcription_data = None | |
| if 'edited_text' not in st.session_state: | |
| st.session_state.edited_text = "" | |
| if 'video_style' not in st.session_state: | |
| st.session_state.video_style = { | |
| 'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF', | |
| 'highlight_color': '#FFD700', 'background_color': '#000000', | |
| 'font_family': 'Arial', 'font_size': 48 | |
| } | |
| if 'new_recording' not in st.session_state: | |
| st.session_state.new_recording = None | |
| # Transcript feed (prepend latest) and dedupe set | |
| if 'transcript_feed' not in st.session_state: | |
| st.session_state.transcript_feed = [] # list of {id, ts, text} | |
| if 'transcript_ids' not in st.session_state: | |
| st.session_state.transcript_ids = set() | |
| # Incremental broadcast state | |
| if 'broadcast_segments' not in st.session_state: | |
| st.session_state.broadcast_segments = [] # [{id, recording_id, start_ms, end_ms, checksum, text}] | |
| if 'lastFetchedEnd_ms' not in st.session_state: | |
| st.session_state.lastFetchedEnd_ms = 0 | |
| # Broadcast translation language (separate from general UI translation target) | |
| if 'broadcast_translation_lang' not in st.session_state: | |
| # Default broadcast translation target to Arabic | |
| st.session_state.broadcast_translation_lang = 'ar' | |
| if 'summary_language' not in st.session_state: | |
| # Default summary language to Arabic | |
| st.session_state.summary_language = 'ar' | |
| # Auto-generate Arabic summary toggle | |
| if 'auto_generate_summary' not in st.session_state: | |
| st.session_state.auto_generate_summary = True | |
| # Export functionality state | |
| if 'export_timestamp' not in st.session_state: | |
| st.session_state.export_timestamp = None | |
| if 'show_export_modal' not in st.session_state: | |
| st.session_state.show_export_modal = False | |
| if 'export_format' not in st.session_state: | |
| st.session_state.export_format = 'word' | |
| # AI Questions functionality state | |
| if 'selected_text' not in st.session_state: | |
| st.session_state.selected_text = None | |
| if 'selected_segment_id' not in st.session_state: | |
| st.session_state.selected_segment_id = None | |
| if 'show_question_modal' not in st.session_state: | |
| st.session_state.show_question_modal = False | |
| if 'current_question_session' not in st.session_state: | |
| st.session_state.current_question_session = None | |
| if 'preferred_ai_model' not in st.session_state: | |
| st.session_state.preferred_ai_model = 'auto' | |
| if 'preferred_answer_language' not in st.session_state: | |
| st.session_state.preferred_answer_language = 'auto' | |
| # Background processing state | |
| if 'processing_queue' not in st.session_state: | |
| st.session_state.processing_queue = [] | |
| if 'processing_results' not in st.session_state: | |
| st.session_state.processing_results = {} | |
| if 'processing_status' not in st.session_state: | |
| st.session_state.processing_status = {} | |
| # --- Background Audio Processing Function --- | |
| def queue_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): | |
| """Queue audio for background processing""" | |
| import uuid | |
| # Generate unique ID for this processing task | |
| task_id = str(uuid.uuid4())[:8] | |
| # Add to processing queue | |
| task = { | |
| 'id': task_id, | |
| 'audio_bytes': audio_bytes, | |
| 'filename': original_filename, | |
| 'timestamp': time.time(), | |
| 'status': 'queued' | |
| } | |
| st.session_state.processing_queue.append(task) | |
| st.session_state.processing_status[task_id] = 'queued' | |
| # Show immediate feedback | |
| st.info(f"🔄 {'تم إضافة التسجيل للمعالجة' if st.session_state.language == 'ar' else 'Audio queued for processing'} (ID: {task_id})") | |
| return task_id | |
| def process_queued_audio(): | |
| """Process queued audio in background""" | |
| if not st.session_state.processing_queue: | |
| return | |
| # Process first item in queue | |
| task = st.session_state.processing_queue[0] | |
| task_id = task['id'] | |
| # Update status | |
| st.session_state.processing_status[task_id] = 'processing' | |
| task['status'] = 'processing' | |
| # Show processing status using custom feedback instead of st.status | |
| processing_message = f"{'معالجة التسجيل' if st.session_state.language == 'ar' else 'Processing audio'} {task_id}..." | |
| feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) | |
| try: | |
| # Process the audio | |
| result = run_audio_processing_sync(task['audio_bytes'], task['filename']) | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| if result: | |
| # Store result | |
| st.session_state.processing_results[task_id] = result | |
| st.session_state.processing_status[task_id] = 'completed' | |
| task['status'] = 'completed' | |
| show_status_update(f"{'تم الانتهاء من المعالجة' if st.session_state.language == 'ar' else 'Processing completed'} {task_id}", 'success', st.session_state.language) | |
| else: | |
| st.session_state.processing_status[task_id] = 'failed' | |
| task['status'] = 'failed' | |
| show_status_update(f"{'فشلت المعالجة' if st.session_state.language == 'ar' else 'Processing failed'} {task_id}", 'error', st.session_state.language) | |
| except Exception as e: | |
| # إزالة رسالة المعالجة في حالة الخطأ | |
| feedback_placeholder.empty() | |
| st.session_state.processing_status[task_id] = 'failed' | |
| task['status'] = 'failed' | |
| show_status_update(f"{'خطأ في المعالجة' if st.session_state.language == 'ar' else 'Processing error'} {task_id}: {str(e)}", 'error', st.session_state.language) | |
| cleanup_processing_state() | |
| # Remove from queue | |
| st.session_state.processing_queue.pop(0) | |
| # --- Centralized Audio Processing Function --- | |
| def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): | |
| """Main audio processing function with background support""" | |
| # Check if background processing is enabled | |
| if st.session_state.get('background_processing', True): | |
| return queue_audio_processing(audio_bytes, original_filename) | |
| else: | |
| return run_audio_processing_sync(audio_bytes, original_filename) | |
| def run_audio_processing_sync(audio_bytes, original_filename="recorded_audio.wav"): | |
| """ | |
| A single, robust function to handle all audio processing. | |
| Takes audio bytes as input and returns the processed data. | |
| """ | |
| # This function is the classic, non-Custom path; ensure editor sections are enabled | |
| st.session_state['_custom_active'] = False | |
| if not audio_bytes: | |
| st.error("No audio data provided to process.") | |
| return | |
| tmp_file_path = None | |
| log_to_browser_console("--- INFO: Starting unified audio processing. ---") | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file: | |
| tmp_file.write(audio_bytes) | |
| tmp_file_path = tmp_file.name | |
| processor = AUDIO_PROCESSOR_CLASS() | |
| result_data = None | |
| full_text = "" | |
| word_timestamps = [] | |
| # Determine which processing path to take | |
| if st.session_state.enable_translation: | |
| # استخدام التغذية الراجعة المخصصة بدلاً من st.spinner | |
| processing_message = "Performing AI Transcription & Translation... please wait." if st.session_state.language == 'en' else "جاري تنفيذ النسخ والترجمة بالذكاء الاصطناعي... يرجى الانتظار." | |
| feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) | |
| try: | |
| result_data, processor_logs = processor.get_word_timestamps_with_translation( | |
| tmp_file_path, | |
| st.session_state.target_language, | |
| ) | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| except Exception as e: | |
| feedback_placeholder.empty() | |
| cleanup_processing_state() | |
| raise e | |
| log_to_browser_console(processor_logs) | |
| if not result_data or not result_data.get("original_text"): | |
| st.warning( | |
| "Could not generate transcription with translation. Check browser console (F12) for logs." | |
| ) | |
| return | |
| st.session_state.transcription_data = { | |
| "text": result_data["original_text"], | |
| "translated_text": result_data["translated_text"], | |
| "word_timestamps": result_data["word_timestamps"], | |
| "audio_bytes": audio_bytes, | |
| "original_suffix": Path(original_filename).suffix, | |
| "translation_success": result_data.get("translation_success", False), | |
| "detected_language": result_data.get("language_detected", "unknown"), | |
| } | |
| # Update transcript feed (prepend, dedupe by digest) | |
| try: | |
| digest = hashlib.md5(audio_bytes).hexdigest() | |
| except Exception: | |
| digest = f"snap-{int(time.time()*1000)}" | |
| if digest not in st.session_state.transcript_ids: | |
| st.session_state.transcript_ids.add(digest) | |
| st.session_state.transcript_feed.insert( | |
| 0, | |
| { | |
| "id": digest, | |
| "ts": int(time.time() * 1000), | |
| "text": result_data["original_text"], | |
| }, | |
| ) | |
| # Rebuild edited_text with newest first | |
| st.session_state.edited_text = "\n\n".join( | |
| [s["text"] for s in st.session_state.transcript_feed] | |
| ) | |
| else: # Standard processing without translation | |
| # استخدام التغذية الراجعة المخصصة بدلاً من st.spinner | |
| processing_message = "Performing AI Transcription... please wait." if st.session_state.language == 'en' else "جاري تنفيذ النسخ بالذكاء الاصطناعي... يرجى الانتظار." | |
| feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) | |
| try: | |
| word_timestamps, processor_logs = processor.get_word_timestamps( | |
| tmp_file_path | |
| ) | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| except Exception as e: | |
| feedback_placeholder.empty() | |
| cleanup_processing_state() | |
| raise e | |
| log_to_browser_console(processor_logs) | |
| if not word_timestamps: | |
| st.warning( | |
| "Could not generate timestamps. Check browser console (F12) for logs." | |
| ) | |
| return | |
| full_text = " ".join([d["word"] for d in word_timestamps]) | |
| st.session_state.transcription_data = { | |
| "text": full_text, | |
| "word_timestamps": word_timestamps, | |
| "audio_bytes": audio_bytes, | |
| "original_suffix": Path(original_filename).suffix, | |
| "translation_success": False, | |
| } | |
| # Update transcript feed (prepend, dedupe by digest) | |
| try: | |
| digest = hashlib.md5(audio_bytes).hexdigest() | |
| except Exception: | |
| digest = f"snap-{int(time.time()*1000)}" | |
| if digest not in st.session_state.transcript_ids: | |
| st.session_state.transcript_ids.add(digest) | |
| st.session_state.transcript_feed.insert( | |
| 0, {"id": digest, "ts": int(time.time() * 1000), "text": full_text} | |
| ) | |
| # Rebuild edited_text with newest first | |
| st.session_state.edited_text = "\n\n".join( | |
| [s["text"] for s in st.session_state.transcript_feed] | |
| ) | |
| st.session_state.step = 1 # Keep it on the same step | |
| # Return result for background processing | |
| return { | |
| 'original_text': result_data.get("original_text") if result_data else full_text, | |
| 'translated_text': result_data.get("translated_text") if result_data else None, | |
| 'detected_language': result_data.get("language_detected") if result_data else "unknown", | |
| 'translation_success': result_data.get("translation_success", False) if result_data else False, | |
| 'word_timestamps': result_data.get("word_timestamps") if result_data else word_timestamps | |
| } | |
| except Exception as e: | |
| st.error("An unexpected error occurred during audio processing!") | |
| st.exception(e) | |
| log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---") | |
| return None | |
| finally: | |
| if tmp_file_path and os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| # --- Main Application Logic --- | |
| def main(): | |
| # Apply custom styling first | |
| apply_custom_styling() | |
| # Force reload environment variables | |
| load_dotenv(override=True) | |
| # Clear AI models cache on first run or if there's an issue | |
| if 'app_initialized' not in st.session_state: | |
| reset_ai_models() | |
| st.session_state.app_initialized = True | |
| initialize_session_state() | |
| st.markdown(""" | |
| <style> | |
| .main .block-container { animation: fadeIn 0.2s ease-in-out; } | |
| @keyframes fadeIn { from { opacity: 0; } to { opacity: 1; } } | |
| .block-container { padding-top: 1rem; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| with st.sidebar: | |
| st.markdown("## 🌐 Language Settings") | |
| language_options = {'English': 'en', 'العربية': 'ar'} | |
| selected_lang_display = st.selectbox( | |
| "Interface Language", | |
| options=list(language_options.keys()), | |
| index=0 if st.session_state.language == 'en' else 1 | |
| ) | |
| st.session_state.language = language_options[selected_lang_display] | |
| st.markdown("## 🔤 Translation Settings") | |
| st.session_state.enable_translation = st.checkbox( | |
| "Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي", | |
| value=st.session_state.enable_translation, | |
| help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً" | |
| ) | |
| if st.session_state.enable_translation: | |
| target_lang_options = { | |
| 'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es' | |
| } | |
| selected_target = st.selectbox( | |
| "Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة", | |
| options=list(target_lang_options.keys()), index=0 | |
| ) | |
| st.session_state.target_language = target_lang_options[selected_target] | |
| # Auto summary toggle | |
| st.session_state.auto_generate_summary = st.checkbox( | |
| "Auto-generate Arabic summary" if st.session_state.language == 'en' else "توليد الملخص العربي تلقائياً", | |
| value=st.session_state.auto_generate_summary | |
| ) | |
| # Google Account Status | |
| st.markdown("## 🔐 Google Account") | |
| if google_docs_manager.is_authenticated(): | |
| st.success("✅ متصل" if st.session_state.language == 'ar' else "✅ Connected") | |
| else: | |
| st.info("🔒 غير متصل" if st.session_state.language == 'ar' else "🔒 Not connected") | |
| # AI Questions Status | |
| st.markdown("## 🤖 AI Questions") | |
| # Show preferred model | |
| if st.session_state.preferred_ai_model != 'auto': | |
| st.info(f"🎯 {'النموذج المفضل' if st.session_state.language == 'ar' else 'Preferred Model'}: {st.session_state.preferred_ai_model}") | |
| else: | |
| st.info("🔄 " + ("تلقائي" if st.session_state.language == 'ar' else "Auto selection")) | |
| # Model reset button | |
| if st.button("🔄 " + ("إعادة تعيين النماذج" if st.session_state.language == 'ar' else "Reset AI Models"), help="إعادة تحميل نماذج الذكاء الاصطناعي" if st.session_state.language == 'ar' else "Reload AI models"): | |
| reset_ai_models() | |
| st.success("✅ " + ("تم إعادة تعيين النماذج" if st.session_state.language == 'ar' else "AI models reset successfully")) | |
| st.rerun() | |
| # Show preferred answer language | |
| answer_lang = st.session_state.get('preferred_answer_language', 'auto') | |
| if answer_lang != 'auto': | |
| lang_names = {'ar': '🇸🇦 العربية', 'en': '🇺🇸 English', 'fr': '🇫🇷 Français', 'es': '🇪🇸 Español', 'de': '🇩🇪 Deutsch', 'zh': '🇨🇳 中文'} | |
| lang_display = lang_names.get(answer_lang, answer_lang) | |
| st.info(f"🌐 {'لغة الإجابة' if st.session_state.language == 'ar' else 'Answer Language'}: {lang_display}") | |
| else: | |
| current_ui_lang = "🇸🇦 العربية" if st.session_state.language == 'ar' else "🇺🇸 English" | |
| st.info(f"🌐 {'لغة الإجابة' if st.session_state.language == 'ar' else 'Answer Language'}: {current_ui_lang} ({'تلقائي' if st.session_state.language == 'ar' else 'Auto'})") | |
| # Test AI services availability | |
| translator = get_translator() | |
| services_status = {} | |
| # Test Gemini | |
| try: | |
| if hasattr(translator, 'model') and translator.model: | |
| test_response = translator.model.generate_content("Test") | |
| services_status['Gemini'] = "✅" | |
| else: | |
| services_status['Gemini'] = "❌" | |
| except Exception as e: | |
| error_str = str(e) | |
| if "429" in error_str or "quota" in error_str.lower(): | |
| services_status['Gemini'] = "⚠️" | |
| else: | |
| services_status['Gemini'] = "❌" | |
| # Test Groq | |
| try: | |
| if hasattr(translator, '_groq_complete') and translator.groq_api_key: | |
| services_status['Groq'] = "✅" | |
| else: | |
| services_status['Groq'] = "❌" | |
| except Exception: | |
| services_status['Groq'] = "❌" | |
| # Test OpenRouter | |
| try: | |
| if hasattr(translator, '_openrouter_complete') and translator.openrouter_api_key: | |
| services_status['OpenRouter'] = "✅" | |
| else: | |
| services_status['OpenRouter'] = "❌" | |
| except Exception: | |
| services_status['OpenRouter'] = "❌" | |
| # Display services status | |
| st.markdown("**" + ("حالة النماذج" if st.session_state.language == 'ar' else "Models Status") + ":**") | |
| for service, status in services_status.items(): | |
| if status == "✅": | |
| st.success(f"{status} {service}") | |
| elif status == "⚠️": | |
| st.warning(f"{status} {service} (حد يومي)" if st.session_state.language == 'ar' else f"{status} {service} (quota)") | |
| else: | |
| st.error(f"{status} {service}") | |
| # Overall status | |
| available_count = sum(1 for status in services_status.values() if status == "✅") | |
| if available_count > 0: | |
| st.info(f"🤖 {available_count}/3 " + ("نماذج متاحة" if st.session_state.language == 'ar' else "models available")) | |
| else: | |
| st.warning("⚠️ جميع النماذج غير متاحة" if st.session_state.language == 'ar' else "⚠️ All models unavailable") | |
| # Show conversation status and usage stats | |
| question_engine = get_ai_question_engine() | |
| usage_stats = question_engine.get_model_usage_stats() | |
| total_questions = sum(usage_stats.values()) | |
| if st.session_state.current_question_session: | |
| session = question_engine.get_conversation_history(st.session_state.current_question_session) | |
| if session and session.conversation: | |
| st.info(f"💬 {len(session.conversation)} " + ("أسئلة نشطة" if st.session_state.language == 'ar' else "active questions")) | |
| else: | |
| st.info("🤖 جاهز للأسئلة" if st.session_state.language == 'ar' else "🤖 Ready for questions") | |
| else: | |
| st.info("🤖 جاهز للأسئلة" if st.session_state.language == 'ar' else "🤖 Ready for questions") | |
| # Show usage statistics | |
| if total_questions > 0: | |
| with st.expander("📊 إحصائيات الاستخدام" if st.session_state.language == 'ar' else "📊 Usage Statistics"): | |
| st.write(f"**{'إجمالي الأسئلة' if st.session_state.language == 'ar' else 'Total Questions'}: {total_questions}**") | |
| for model, count in usage_stats.items(): | |
| if count > 0: | |
| percentage = (count / total_questions) * 100 | |
| st.write(f"• {model}: {count} ({percentage:.1f}%)") | |
| else: | |
| st.caption("📊 لا توجد إحصائيات بعد" if st.session_state.language == 'ar' else "📊 No statistics yet") | |
| # Quick model switcher | |
| st.markdown("**" + ("تبديل سريع للنموذج" if st.session_state.language == 'ar' else "Quick Model Switch") + "**") | |
| question_engine = get_ai_question_engine() | |
| models_status = question_engine.check_model_availability() | |
| # Create buttons for each available model | |
| cols = st.columns(2) | |
| model_names = ['auto', 'Gemini AI', 'Groq AI', 'OpenRouter AI'] | |
| for i, model in enumerate(model_names): | |
| with cols[i % 2]: | |
| if model == 'auto': | |
| button_text = "🔄 تلقائي" if st.session_state.language == 'ar' else "🔄 Auto" | |
| is_current = st.session_state.preferred_ai_model == 'auto' | |
| else: | |
| status_info = models_status.get(model, {}) | |
| icon = status_info.get('icon', '❓') | |
| button_text = f"{icon} {model.split()[0]}" # Show first word + icon | |
| is_current = st.session_state.preferred_ai_model == model | |
| button_type = "primary" if is_current else "secondary" | |
| if st.button(button_text, key=f"switch_{model}", type=button_type, use_container_width=True): | |
| st.session_state.preferred_ai_model = model | |
| st.rerun() | |
| # Quick language switcher | |
| st.markdown("**" + ("تبديل سريع للغة" if st.session_state.language == 'ar' else "Quick Language Switch") + "**") | |
| language_buttons = { | |
| 'auto': "🔄 تلقائي" if st.session_state.language == 'ar' else "🔄 Auto", | |
| 'ar': "🇸🇦 عربي", | |
| 'en': "🇺🇸 EN", | |
| 'fr': "🇫🇷 FR", | |
| 'es': "🇪🇸 ES" | |
| } | |
| cols_lang = st.columns(3) | |
| for i, (lang_code, button_text) in enumerate(language_buttons.items()): | |
| with cols_lang[i % 3]: | |
| is_current_lang = st.session_state.get('preferred_answer_language', 'auto') == lang_code | |
| button_type_lang = "primary" if is_current_lang else "secondary" | |
| if st.button(button_text, key=f"switch_lang_{lang_code}", type=button_type_lang, use_container_width=True): | |
| st.session_state.preferred_answer_language = lang_code | |
| st.rerun() | |
| # Processing status | |
| if st.session_state.processing_queue or st.session_state.processing_results: | |
| st.markdown("## 🔄 " + ("حالة المعالجة" if st.session_state.language == 'ar' else "Processing Status")) | |
| # Queue status | |
| if st.session_state.processing_queue: | |
| queue_count = len(st.session_state.processing_queue) | |
| st.warning(f"⏳ {queue_count} " + ("في الانتظار" if st.session_state.language == 'ar' else "in queue")) | |
| # Results count | |
| if st.session_state.processing_results: | |
| results_count = len(st.session_state.processing_results) | |
| st.success(f"✅ {results_count} " + ("مكتمل" if st.session_state.language == 'ar' else "completed")) | |
| # Clear all button | |
| if st.button("🗑️ " + ("مسح الكل" if st.session_state.language == 'ar' else "Clear All")): | |
| st.session_state.processing_queue = [] | |
| st.session_state.processing_results = {} | |
| st.session_state.processing_status = {} | |
| st.rerun() | |
| st.title("🎵 SyncMaster") | |
| if st.session_state.language == 'ar': | |
| st.markdown("### منصة المزامنة الذكية بين الصوت والنص") | |
| else: | |
| st.markdown("### The Intelligent Audio-Text Synchronization Platform") | |
| # Simplified interface - removed step indicators as requested | |
| # Global settings for long recording retention and custom snapshot duration | |
| with st.expander("⚙️ Recording Settings (Snapshots)", expanded=False): | |
| st.session_state.setdefault('retention_minutes', 30) | |
| # 0 means: use full buffer by default for Custom | |
| st.session_state.setdefault('custom_snapshot_seconds', 0) | |
| # Auto-Custom interval seconds (for frontend auto trigger) | |
| st.session_state.setdefault('auto_custom_interval_sec', 10) | |
| # Auto-start incremental snapshots when recording begins | |
| st.session_state.setdefault('auto_start_custom', False) | |
| st.session_state.retention_minutes = st.number_input("Retention window (minutes)", min_value=5, max_value=240, value=st.session_state.retention_minutes) | |
| st.session_state.custom_snapshot_seconds = st.number_input("Custom snapshot (seconds; 0 = full buffer)", min_value=0, max_value=3600, value=st.session_state.custom_snapshot_seconds) | |
| st.session_state.auto_custom_interval_sec = st.number_input("Auto Custom interval (seconds)", min_value=1, max_value=3600, value=st.session_state.auto_custom_interval_sec, help="How often to auto-trigger the same Custom action while recording.") | |
| st.session_state.auto_start_custom = st.checkbox("Auto-start incremental snapshots on record", value=st.session_state.auto_start_custom, help="Start sending Custom intervals automatically as soon as you start recording.") | |
| # Inject globals into the page for the component to pick up | |
| components.html(f""" | |
| <script> | |
| window.ST_AREC_RETENTION_MINUTES = {int(st.session_state.retention_minutes)}; | |
| window.ST_AREC_CUSTOM_SNAPSHOT_SECONDS = {int(st.session_state.custom_snapshot_seconds)}; | |
| window.ST_AREC_LAST_FETCHED_END_MS = {int(st.session_state.get('lastFetchedEnd_ms', 0))}; | |
| window.ST_AREC_CUSTOM_AUTO_INTERVAL_SECONDS = {int(st.session_state.get('auto_custom_interval_sec', 10))}; | |
| window.ST_AREC_AUTO_START = {str(bool(st.session_state.get('auto_start_custom', True))).lower()}; | |
| console.log('Recorder config', window.ST_AREC_RETENTION_MINUTES, window.ST_AREC_CUSTOM_SNAPSHOT_SECONDS); | |
| </script> | |
| """, height=0) | |
| if AUDIO_PROCESSOR_CLASS is None: | |
| st.error("Fatal Error: The application could not start correctly.") | |
| st.subheader("An error occurred while trying to import `AudioProcessor`:") | |
| st.code(IMPORT_ERROR_TRACEBACK, language="python") | |
| st.stop() | |
| step_1_upload_and_process() | |
| # Process background queue | |
| if st.session_state.get('background_processing', True) and st.session_state.processing_queue: | |
| process_queued_audio() | |
| # Show processing results optionally | |
| if st.session_state.get('show_processing_results', False): | |
| show_processing_results() | |
| elif st.session_state.processing_results: | |
| # Show a button to view results if there are any | |
| if st.button("📝 " + ("عرض نتائج المعالجة" if st.session_state.language == 'ar' else "Show Processing Results") + f" ({len(st.session_state.processing_results)})", type="secondary"): | |
| st.session_state.show_processing_results = True | |
| st.rerun() | |
| # Note: step_2_review_and_customize removed as requested | |
| # Results are now shown in show_processing_results() function | |
| # AI Question modal (show outside of other components) | |
| if st.session_state.show_question_modal: | |
| show_question_modal() | |
| # Export modal (show outside of other components) | |
| if st.session_state.show_export_modal: | |
| show_export_modal() | |
| # --- Show Processing Results --- | |
| def show_processing_results(): | |
| """Show processing results in the same page""" | |
| if not st.session_state.processing_results: | |
| return | |
| st.markdown("---") | |
| # Header with results count and close button | |
| col_header, col_close = st.columns([4, 1]) | |
| with col_header: | |
| results_count = len(st.session_state.processing_results) | |
| st.subheader(f"📝 {'نتائج المعالجة' if st.session_state.language == 'ar' else 'Processing Results'} ({results_count})") | |
| with col_close: | |
| if st.button("❌ " + ("إخفاء" if st.session_state.language == 'ar' else "Hide"), key="hide_results"): | |
| st.session_state.show_processing_results = False | |
| st.rerun() | |
| if results_count > 1: | |
| # Show all results in one view option | |
| show_all = st.checkbox( | |
| "عرض جميع النتائج مجمعة" if st.session_state.language == 'ar' else "Show all results combined", | |
| help="عرض جميع النصوص والترجمات في مكان واحد" if st.session_state.language == 'ar' else "Display all texts and translations in one place" | |
| ) | |
| if show_all: | |
| # Combined view | |
| st.markdown("### " + ("النصوص الأصلية مجمعة" if st.session_state.language == 'ar' else "Combined Original Texts")) | |
| combined_original = "\n\n".join([result.get('original_text', '') for result in st.session_state.processing_results.values() if result.get('original_text')]) | |
| if combined_original: | |
| st.write(combined_original) | |
| if st.button("📋 " + ("نسخ جميع النصوص" if st.session_state.language == 'ar' else "Copy All Texts")): | |
| st.code(combined_original, language=None) | |
| st.markdown("### " + ("الترجمات مجمعة" if st.session_state.language == 'ar' else "Combined Translations")) | |
| combined_translation = "\n\n".join([result.get('translated_text', '') for result in st.session_state.processing_results.values() if result.get('translated_text')]) | |
| if combined_translation: | |
| st.write(combined_translation) | |
| if st.button("📋 " + ("نسخ جميع الترجمات" if st.session_state.language == 'ar' else "Copy All Translations")): | |
| st.code(combined_translation, language=None) | |
| st.markdown("---") | |
| # Show results for each completed processing | |
| for task_id, result in st.session_state.processing_results.items(): | |
| with st.expander(f"🎵 {'التسجيل' if st.session_state.language == 'ar' else 'Recording'} {task_id}", expanded=True): | |
| # Original text in white container | |
| if result.get('original_text'): | |
| original_title = "النص الأصلي" if st.session_state.language == 'ar' else "Original Text" | |
| original_container = create_white_container(original_title, result['original_text'], "📝") | |
| st.markdown(original_container, unsafe_allow_html=True) | |
| # Translation in white container | |
| if result.get('translated_text'): | |
| translation_title = "الترجمة" if st.session_state.language == 'ar' else "Translation" | |
| translation_container = create_white_container(translation_title, result['translated_text'], "🌐") | |
| st.markdown(translation_container, unsafe_allow_html=True) | |
| # Language info | |
| if result.get('detected_language'): | |
| st.caption(f"🌐 {'اللغة المكتشفة' if st.session_state.language == 'ar' else 'Detected language'}: {result['detected_language']}") | |
| # Action buttons | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button(f"📋 {'نسخ النص' if st.session_state.language == 'ar' else 'Copy Text'}", key=f"copy_original_{task_id}"): | |
| st.code(result.get('original_text', ''), language=None) | |
| st.success("✅ " + ("تم تنسيق النص للنسخ" if st.session_state.language == 'ar' else "Text formatted for copying")) | |
| with col2: | |
| if result.get('translated_text') and st.button(f"📋 {'نسخ الترجمة' if st.session_state.language == 'ar' else 'Copy Translation'}", key=f"copy_translation_{task_id}"): | |
| st.code(result.get('translated_text', ''), language=None) | |
| st.success("✅ " + ("تم تنسيق الترجمة للنسخ" if st.session_state.language == 'ar' else "Translation formatted for copying")) | |
| with col3: | |
| if st.button(f"🗑️ {'حذف' if st.session_state.language == 'ar' else 'Delete'}", key=f"delete_{task_id}"): | |
| del st.session_state.processing_results[task_id] | |
| if task_id in st.session_state.processing_status: | |
| del st.session_state.processing_status[task_id] | |
| st.rerun() | |
| # --- Step 1: Upload and Process --- | |
| def step_1_upload_and_process(): | |
| st.header("🎵 " + ("مصدر الصوت" if st.session_state.language == 'ar' else "Audio Source")) | |
| upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"]) | |
| with upload_tab: | |
| st.subheader("Upload an existing audio file") | |
| uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A") | |
| if uploaded_file: | |
| st.session_state.audio_data = uploaded_file.getvalue() | |
| st.success(f"File ready for processing: {uploaded_file.name}") | |
| st.audio(st.session_state.audio_data) | |
| if st.button("🚀 Start AI Processing", type="primary", use_container_width=True): | |
| run_audio_processing(st.session_state.audio_data, uploaded_file.name) | |
| if st.session_state.audio_data: | |
| if st.button("🔄 Use a Different File"): | |
| reset_session() | |
| st.rerun() | |
| with record_tab: | |
| st.subheader("Record audio directly from your microphone") | |
| # Recording instructions | |
| # Recording instructions with improved controls | |
| if st.session_state.language == 'ar': | |
| st.info("🎙️ **تحكم بسيط في التسجيل:**\n- اضغط الميكروفون لبدء التسجيل\n- اضغط مرة أخرى للتوقف\n- استخدم الأزرار أدناه للتحكم الإضافي") | |
| else: | |
| st.info("🎙️ **Simple Recording Controls:**\n- Click microphone to start recording\n- Click again to stop\n- Use buttons below for additional control") | |
| # Recording control buttons | |
| col_record_info, col_record_controls = st.columns([2, 1]) | |
| with col_record_info: | |
| # This will show recording status | |
| pass | |
| with col_record_controls: | |
| # Recording control buttons removed - now handled by the audio component itself | |
| pass | |
| # Recording status | |
| recording_status_placeholder = st.empty() | |
| # Use the audio recorder component | |
| wav_audio_data = st_audiorec() | |
| # Show recording status and controls | |
| if wav_audio_data: | |
| # Check if wav_audio_data is bytes or dict | |
| if isinstance(wav_audio_data, bytes): | |
| # Simple bytes data - show audio player | |
| recording_status_placeholder.success("🎵 " + ("تسجيل جاهز للمعالجة" if st.session_state.language == 'ar' else "Recording ready for processing")) | |
| # Recording controls | |
| col_play, col_clear = st.columns(2) | |
| with col_play: | |
| st.audio(wav_audio_data, format='audio/wav') | |
| with col_clear: | |
| if st.button("🗑️ " + ("مسح التسجيل" if st.session_state.language == 'ar' else "Clear Recording")): | |
| st.rerun() | |
| elif isinstance(wav_audio_data, dict): | |
| # Dict data - handle interval processing | |
| recording_status_placeholder.info("🔄 " + ("معالجة المقاطع..." if st.session_state.language == 'ar' else "Processing intervals...")) | |
| # Google Docs Export and Logout Buttons | |
| col_export, col_logout = st.columns([3, 1]) | |
| with col_export: | |
| export_button_text = "📤 تصدير إلى Google Docs" if st.session_state.language == 'ar' else "📤 Export to Google Docs" | |
| if st.button(export_button_text, type="primary", use_container_width=True): | |
| export_to_google_docs_directly() | |
| with col_logout: | |
| # Check if user is authenticated | |
| if google_docs_manager.is_authenticated(): | |
| logout_text = "🚪 خروج" if st.session_state.language == 'ar' else "🚪 Logout" | |
| if st.button(logout_text, use_container_width=True, help="تسجيل الخروج من Google" if st.session_state.language == 'ar' else "Logout from Google"): | |
| logout_from_google() | |
| else: | |
| # Show login status | |
| login_status = "غير متصل" if st.session_state.language == 'ar' else "Not logged in" | |
| st.caption(f"🔒 {login_status}") | |
| # Processing settings | |
| st.markdown("**" + ("إعدادات المعالجة" if st.session_state.language == 'ar' else "Processing Settings") + "**") | |
| # Auto-process toggle (changed default to False for better UX) | |
| st.session_state.setdefault('auto_process_snapshots', False) | |
| auto_process = st.checkbox( | |
| "معالجة تلقائية للمقاطع" if st.session_state.language == 'ar' else "Auto-process snapshots", | |
| key='auto_process_snapshots', | |
| help="عند التفعيل، يتم معالجة المقاطع تلقائياً أثناء التسجيل" if st.session_state.language == 'ar' else "When enabled, snapshots are processed automatically during recording" | |
| ) | |
| # Background processing toggle | |
| st.session_state.setdefault('background_processing', True) | |
| background_mode = st.checkbox( | |
| "معالجة في الخلفية" if st.session_state.language == 'ar' else "Background processing", | |
| key='background_processing', | |
| value=True, | |
| help="يسمح بالاستمرار في استخدام التطبيق أثناء المعالجة" if st.session_state.language == 'ar' else "Allows continued use of the app during processing" | |
| ) | |
| if wav_audio_data: | |
| # Two possible payload shapes: raw bytes array (legacy) or interval payload dict | |
| if isinstance(wav_audio_data, dict) and wav_audio_data.get('type') in ('interval_wav', 'no_new'): | |
| payload = wav_audio_data | |
| # Mark Custom interval flow active so Step 2 editor/style can be hidden | |
| st.session_state['_custom_active'] = True | |
| if payload['type'] == 'no_new': | |
| st.info("No new audio chunks yet.") | |
| elif payload['type'] == 'interval_wav': | |
| # Extract interval audio | |
| b = bytes(payload['bytes']) | |
| sr = int(payload.get('sr', 16000)) | |
| start_ms = int(payload['start_ms']) | |
| end_ms = int(payload['end_ms']) | |
| # Dedupe/trim logic | |
| if end_ms <= start_ms: | |
| st.warning("The received interval is empty.") | |
| else: | |
| # Prevent overlap with prior segment | |
| last_end = st.session_state.lastFetchedEnd_ms or 0 | |
| eff_start_ms = max(start_ms, last_end) | |
| if eff_start_ms < end_ms: | |
| # If there is overlap, trim the audio bytes accordingly (assumes WAV PCM16 mono header 44 bytes) | |
| try: | |
| delta_ms = eff_start_ms - start_ms | |
| if delta_ms > 0: | |
| if len(b) >= 44 and b[0:4] == b'RIFF' and b[8:12] == b'WAVE': | |
| bytes_per_sample = 2 # PCM16 mono | |
| drop_samples = int(sr * (delta_ms / 1000.0)) | |
| drop_bytes = drop_samples * bytes_per_sample | |
| data_size = int.from_bytes(b[40:44], 'little') if len(b) >= 44 else len(b) - 44 | |
| pcm = b[44:] | |
| if drop_bytes < len(pcm): | |
| pcm_trim = pcm[drop_bytes:] | |
| else: | |
| pcm_trim = b'' | |
| new_data_size = len(pcm_trim) | |
| # Rebuild header sizes | |
| header = bytearray(b[:44]) | |
| # ChunkSize at offset 4 = 36 + Subchunk2Size | |
| (36 + new_data_size).to_bytes(4, 'little') | |
| header[4:8] = (36 + new_data_size).to_bytes(4, 'little') | |
| # Subchunk2Size at offset 40 | |
| header[40:44] = new_data_size.to_bytes(4, 'little') | |
| b = bytes(header) + pcm_trim | |
| else: | |
| # Not a recognizable WAV header; keep as-is | |
| pass | |
| except Exception as _: | |
| pass | |
| # Compute checksum | |
| digest = hashlib.md5(b).hexdigest() | |
| # Skip if identical checksum and same window | |
| exists = any(s.get('checksum') == digest and s.get('start_ms') == eff_start_ms and s.get('end_ms') == end_ms for s in st.session_state.broadcast_segments) | |
| if not exists: | |
| # Show processing feedback during extraction | |
| extraction_message = "Extracting text from interval..." if st.session_state.language == 'en' else "جاري استخراج النص من الفترة الزمنية..." | |
| feedback_placeholder = show_processing_feedback(extraction_message, st.session_state.language) | |
| try: | |
| # Run standard pipeline to get text (no translation to keep it light) | |
| # Reuse run_audio_processing internals via a temp path | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tf: | |
| tf.write(b) | |
| tmp_path = tf.name | |
| try: | |
| processor = AUDIO_PROCESSOR_CLASS() | |
| word_timestamps, processor_logs, model_used = processor.get_word_timestamps(tmp_path) | |
| full_text = " ".join([d['word'] for d in word_timestamps]) if word_timestamps else "" | |
| # Fallback: if timestamps extraction yielded no words, try plain transcription | |
| if not full_text: | |
| plain_text, err, fallback_model = processor.transcribe_audio(tmp_path) | |
| if plain_text: | |
| full_text = plain_text.strip() | |
| model_used = fallback_model | |
| finally: | |
| if os.path.exists(tmp_path): os.unlink(tmp_path) | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| except Exception as e: | |
| feedback_placeholder.empty() | |
| cleanup_processing_state() | |
| raise e | |
| # Append segment immediately with only the original text | |
| seg = { | |
| 'id': digest, | |
| 'recording_id': payload.get('session_id', 'local'), | |
| 'start_ms': eff_start_ms, | |
| 'end_ms': end_ms, | |
| 'checksum': digest, | |
| 'text': full_text, | |
| 'translations': {}, | |
| 'transcription_model': model_used, | |
| } | |
| st.session_state.broadcast_segments.append(seg) | |
| # Sort segments by start time (oldest first for internal storage) | |
| st.session_state.broadcast_segments.sort(key=lambda s: s.get('start_ms', 0)) | |
| # Debug log for segment addition | |
| if st.session_state.get('debug_mode', False): | |
| st.write(f"Debug: Added segment #{len(st.session_state.broadcast_segments)}: {eff_start_ms/1000:.1f}s-{end_ms/1000:.1f}s") | |
| st.session_state.lastFetchedEnd_ms = end_ms | |
| if full_text: | |
| if digest not in st.session_state.transcript_ids: | |
| st.session_state.transcript_ids.add(digest) | |
| st.session_state.transcript_feed.insert( | |
| 0, | |
| { | |
| "id": digest, | |
| "ts": int(time.time() * 1000), | |
| "text": full_text, | |
| }, | |
| ) | |
| st.session_state.edited_text = "\n\n".join( | |
| [s["text"] for s in st.session_state.transcript_feed] | |
| ) | |
| # Show immediate success message | |
| success_msg = f"✅ {'تم إضافة مقطع جديد' if st.session_state.language == 'ar' else 'Added new segment'}: {eff_start_ms/1000:.2f}s → {end_ms/1000:.2f}s" | |
| st.success(success_msg) | |
| # Force UI refresh to show the new segment immediately in the broadcast stack | |
| st.rerun() | |
| # Now, asynchronously update translation and summary after segment is added | |
| def update_translation_and_summary(): | |
| try: | |
| if full_text and st.session_state.get('enable_translation', True): | |
| translator = get_translator() | |
| sel_lang = st.session_state.get('broadcast_translation_lang', 'ar') | |
| tx, _ = translator.translate_text(full_text, target_language=sel_lang) | |
| if tx: | |
| seg['translations'][sel_lang] = tx | |
| except Exception: | |
| pass | |
| # Update summary | |
| if st.session_state.get('auto_generate_summary', True): | |
| try: | |
| source_text = " \n".join([s.get('text', '') for s in st.session_state.broadcast_segments if s.get('text')]) | |
| if source_text.strip(): | |
| summary, _ = generate_summary(source_text, target_language=st.session_state.get('summary_language', 'ar')) | |
| if summary: | |
| st.session_state.arabic_explanation = summary | |
| except Exception: | |
| pass | |
| import threading | |
| threading.Thread(target=update_translation_and_summary, daemon=True).start() | |
| else: | |
| st.info("Duplicate segment ignored.") | |
| else: | |
| st.info("No new parts after the last point.") | |
| else: | |
| # Legacy: treat as full wav bytes | |
| bytes_data = bytes(wav_audio_data) | |
| # This is not the Custom interval mode | |
| st.session_state['_custom_active'] = False | |
| st.session_state.audio_data = bytes_data | |
| st.audio(bytes_data) | |
| digest = hashlib.md5(bytes_data).hexdigest() | |
| last_digest = st.session_state.get('_last_component_digest') | |
| if st.session_state.auto_process_snapshots and digest != last_digest: | |
| st.session_state['_last_component_digest'] = digest | |
| task_id = run_audio_processing(bytes_data, "snapshot.wav") | |
| if task_id: | |
| st.success(f"🔄 {'تم إضافة المقطع للمعالجة' if st.session_state.language == 'ar' else 'Snapshot queued for processing'}") | |
| else: | |
| # Simple single button for processing | |
| if st.button("📝 " + ("استخراج النص" if st.session_state.language == 'ar' else "Extract Text"), type="primary", use_container_width=True): | |
| st.session_state['_last_component_digest'] = digest | |
| task_id = run_audio_processing(bytes_data, "recorded_audio.wav") | |
| if task_id: | |
| st.success(f"✅ {'تم إضافة التسجيل للمعالجة' if st.session_state.language == 'ar' else 'Audio queued for processing'}") | |
| # Simplified: removed external live slice server UI to avoid complexity | |
| # Always show Broadcast view in Step 1 as well (regardless of transcription_data) | |
| # Use a container that refreshes automatically when segments are added | |
| # Always show Broadcast view in Step 1 as well (regardless of transcription_data) | |
| with st.expander("📻 Broadcast (latest first)", expanded=True): | |
| # Language selector for broadcast translations | |
| try: | |
| translator = get_translator() | |
| langs = translator.get_supported_languages() | |
| codes = list(langs.keys()) | |
| labels = ["detect language — Arabic (العربية)"] + [f"{code} — {langs[code]}" for code in codes] | |
| current = st.session_state.get('broadcast_translation_lang', 'ar') | |
| # If not set, default to 'detect' | |
| if current not in codes and current != 'detect': | |
| current = 'detect' | |
| default_index = 0 if current == 'detect' else (codes.index(current) + 1 if current in codes else 1) | |
| sel_label = st.selectbox("Broadcast translation language", labels, index=default_index) | |
| if sel_label.startswith("detect language"): | |
| sel_code = 'detect' | |
| else: | |
| sel_code = sel_label.split(' — ')[0] | |
| st.session_state.broadcast_translation_lang = sel_code | |
| except Exception: | |
| sel_code = st.session_state.get('broadcast_translation_lang', 'ar') | |
| if st.session_state.broadcast_segments: | |
| # Show all segments (no pagination for broadcast view) | |
| total_segments = len(st.session_state.broadcast_segments) | |
| # Ensure we have valid segments with required fields | |
| valid_segments = [s for s in st.session_state.broadcast_segments if s.get('text') and s.get('start_ms') is not None] | |
| if len(valid_segments) != total_segments: | |
| st.warning(f"Found {total_segments - len(valid_segments)} invalid segments. Showing {len(valid_segments)} valid segments.") | |
| total_segments = len(valid_segments) | |
| st.session_state.broadcast_segments = valid_segments | |
| # Show total count with better styling | |
| st.markdown(f""" | |
| <div style=" | |
| background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 8px 16px; | |
| border-radius: 20px; | |
| text-align: center; | |
| font-weight: 600; | |
| margin-bottom: 15px; | |
| box-shadow: 0 2px 10px rgba(102, 126, 234, 0.3); | |
| "> | |
| 📻 {'البث المباشر' if st.session_state.language == 'ar' else 'Live Broadcast'} • {total_segments} {'مقطع' if st.session_state.language == 'ar' else 'segments'} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Show all segments (newest first) - ensure fresh sorting every time | |
| sorted_segments = sorted(st.session_state.broadcast_segments, key=lambda s: s.get('start_ms', 0), reverse=True) | |
| for idx, s in enumerate(sorted_segments, 1): | |
| # Create unique segment ID | |
| segment_id = s.get('id', f"seg_{s['start_ms']}_{s['end_ms']}") | |
| # Original text with selection capability | |
| original_text = s.get('text', '') | |
| if original_text: | |
| # Check if this segment is selected | |
| is_selected = (st.session_state.selected_segment_id == segment_id) | |
| # Create timestamp for bubble with segment number | |
| timestamp = f"#{idx} • {s['start_ms']/1000:.1f}s → {s['end_ms']/1000:.1f}s" | |
| # Create columns for bubble and ask button | |
| col_bubble, col_ask = st.columns([5, 1]) | |
| with col_bubble: | |
| # Display text as chat bubble | |
| bubble_html = create_broadcast_bubble(original_text, timestamp, is_selected) | |
| st.markdown(bubble_html, unsafe_allow_html=True) | |
| if is_selected: | |
| st.success("🔍 " + ("هذا النص محدد للأسئلة" if st.session_state.language == 'ar' else "This text is selected for questions")) | |
| with col_ask: | |
| # Ask AI button | |
| ask_button_text = "🤖 اسأل" if st.session_state.language == 'ar' else "🤖 Ask" | |
| button_type = "primary" if not is_selected else "secondary" | |
| if st.button(ask_button_text, key=f"ask_{segment_id}", type=button_type, use_container_width=True, help="اسأل الذكاء الاصطناعي عن هذا النص" if st.session_state.language == 'ar' else "Ask AI about this text"): | |
| # Select this text and open question modal | |
| st.session_state.selected_text = original_text | |
| st.session_state.selected_segment_id = segment_id | |
| st.session_state.show_question_modal = True | |
| st.rerun() | |
| # Show model used for transcription | |
| model_note = s.get('transcription_model', None) | |
| if model_note: | |
| st.caption(f"Model used: {model_note}") | |
| # Ensure and show translation in selected language | |
| if s.get('text') and st.session_state.get('enable_translation', True): | |
| if 'translations' not in s or not isinstance(s.get('translations'), dict): | |
| s['translations'] = {} | |
| # Detect language and translate if 'detect' is selected | |
| if sel_code == 'detect': | |
| # Use detected language from segment if available, else fallback to 'ar' | |
| detected_lang = s.get('detected_language', None) | |
| target_lang = 'ar' # Always translate to Arabic in detect mode | |
| if target_lang not in s['translations']: | |
| try: | |
| tx, _ = get_translator().translate_text(s.get('text', ''), target_language=target_lang) | |
| if tx: | |
| s['translations'][target_lang] = tx | |
| except Exception: | |
| pass | |
| if s['translations'].get(target_lang): | |
| st.caption(f"Translation (AR):") | |
| st.write(s['translations'][target_lang]) | |
| else: | |
| if sel_code not in s['translations']: | |
| try: | |
| tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code) | |
| if tx: | |
| s['translations'][sel_code] = tx | |
| except Exception: | |
| pass | |
| if s['translations'].get(sel_code): | |
| st.caption(f"Translation ({sel_code.upper()}):") | |
| st.write(s['translations'][sel_code]) | |
| st.divider() | |
| else: | |
| st.caption("No segments yet. Use the Custom button while recording.") | |
| # --- Google Logout Function --- | |
| def logout_from_google(): | |
| """Logout from Google account""" | |
| try: | |
| success = google_docs_manager.logout() | |
| if success: | |
| st.success("تم تسجيل الخروج بنجاح!" if st.session_state.language == 'ar' else "Successfully logged out!") | |
| st.info("يمكنك الآن تسجيل الدخول بحساب آخر" if st.session_state.language == 'ar' else "You can now login with a different account") | |
| # Force rerun to update UI | |
| time.sleep(1) | |
| st.rerun() | |
| else: | |
| st.error("خطأ في تسجيل الخروج" if st.session_state.language == 'ar' else "Error during logout") | |
| except Exception as e: | |
| st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}") | |
| # --- Direct Google Docs Export Function --- | |
| def export_to_google_docs_directly(): | |
| """Export broadcast segments directly to Google Docs without conditions""" | |
| try: | |
| # Get current segments (all segments, not filtered by timestamp) | |
| segments = st.session_state.broadcast_segments or [] | |
| # Show progress using custom feedback | |
| export_message = "جاري التصدير إلى Google Docs..." if st.session_state.language == 'ar' else "Exporting to Google Docs..." | |
| feedback_placeholder = show_processing_feedback(export_message, st.session_state.language) | |
| try: | |
| # Export directly | |
| doc_url, error = google_docs_manager.export_broadcast_to_docs( | |
| segments, | |
| ui_language=st.session_state.language | |
| ) | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| except Exception as e: | |
| feedback_placeholder.empty() | |
| cleanup_processing_state() | |
| raise e | |
| if doc_url and not error: | |
| st.success("تم إنشاء المستند بنجاح!" if st.session_state.language == 'ar' else "Document created successfully!") | |
| # Show clickable link | |
| if st.session_state.language == 'ar': | |
| st.markdown(f"🔗 [فتح المستند في Google Docs]({doc_url})") | |
| st.info("💡 نصيحة: اضغط على الرابط أعلاه لفتح المستند في تبويب جديد") | |
| else: | |
| st.markdown(f"🔗 [Open Document in Google Docs]({doc_url})") | |
| st.info("💡 Tip: Click the link above to open the document in a new tab") | |
| # Also show the URL for copying | |
| st.code(doc_url, language=None) | |
| # Show current user info | |
| if google_docs_manager.is_authenticated(): | |
| st.caption("✅ متصل بحساب Google" if st.session_state.language == 'ar' else "✅ Connected to Google account") | |
| else: | |
| error_msg = error or "Unknown error occurred" | |
| st.error(f"خطأ في التصدير: {error_msg}" if st.session_state.language == 'ar' else f"Export error: {error_msg}") | |
| # Show setup instructions if credentials are missing | |
| if "credentials" in error_msg.lower() or "authentication" in error_msg.lower(): | |
| st.info("📋 يرجى مراجعة ملف GOOGLE_SETUP.md لإعداد Google Docs" if st.session_state.language == 'ar' else "📋 Please check GOOGLE_SETUP.md for Google Docs setup instructions") | |
| except Exception as e: | |
| st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}") | |
| # --- AI Question Modal Function --- | |
| def show_question_modal(): | |
| """Display AI question modal for selected text""" | |
| if not st.session_state.selected_text: | |
| st.session_state.show_question_modal = False | |
| return | |
| # Get AI question engine | |
| question_engine = get_ai_question_engine() | |
| # Modal header | |
| st.subheader("🤖 اسأل الذكاء الاصطناعي" if st.session_state.language == 'ar' else "🤖 Ask AI") | |
| # Show selected text | |
| with st.expander("النص المحدد" if st.session_state.language == 'ar' else "Selected Text", expanded=True): | |
| st.write(f"📝 {st.session_state.selected_text}") | |
| # Show conversation history if exists | |
| if st.session_state.current_question_session: | |
| session = question_engine.get_conversation_history(st.session_state.current_question_session) | |
| if session and session.conversation: | |
| with st.expander(f"💬 تاريخ المحادثة ({len(session.conversation)} أسئلة)" if st.session_state.language == 'ar' else f"💬 Conversation History ({len(session.conversation)} questions)", expanded=False): | |
| for i, qa in enumerate(session.conversation, 1): | |
| st.markdown(f"**{i}. {qa.question}**") | |
| st.write(qa.answer) | |
| # Show timing and model info | |
| model_info = getattr(qa, 'model_used', 'Unknown') | |
| model_color = "green" if "Gemini" in model_info else "orange" if "Groq" in model_info or "OpenRouter" in model_info else "red" | |
| caption_text = f"⏱️ {qa.timestamp.strftime('%H:%M:%S')} - {qa.response_time_ms}ms" | |
| model_text = f"🔧 {model_info}" | |
| st.caption(caption_text) | |
| st.markdown(f"<small style='color: {model_color}'>{model_text}</small>", unsafe_allow_html=True) | |
| if i < len(session.conversation): | |
| st.divider() | |
| # Model selection (moved to top) | |
| st.markdown("**" + ("اختيار النموذج" if st.session_state.language == 'ar' else "Model Selection") + "**") | |
| # Get model availability | |
| models_status = question_engine.check_model_availability() | |
| # Create model options with status indicators | |
| model_options = {} | |
| for model_name, status_info in models_status.items(): | |
| display_name = f"{status_info['icon']} {model_name} - {status_info['message']}" | |
| model_options[display_name] = model_name | |
| # Add auto option | |
| auto_text = "🔄 تلقائي (أفضل نموذج متاح)" if st.session_state.language == 'ar' else "🔄 Auto (Best available model)" | |
| model_options = {auto_text: 'auto', **model_options} | |
| # Find current selection index | |
| current_model = st.session_state.get('preferred_ai_model', 'auto') | |
| current_index = 0 | |
| for i, (display_name, model_name) in enumerate(model_options.items()): | |
| if model_name == current_model: | |
| current_index = i | |
| break | |
| # Model selector | |
| selected_model_display = st.selectbox( | |
| "النموذج المفضل" if st.session_state.language == 'ar' else "Preferred Model", | |
| options=list(model_options.keys()), | |
| index=current_index, | |
| help="اختر النموذج المفضل للإجابة" if st.session_state.language == 'ar' else "Choose preferred model for answering" | |
| ) | |
| selected_model = model_options[selected_model_display] | |
| st.session_state.preferred_ai_model = selected_model | |
| # Show model status details | |
| if selected_model != 'auto': | |
| status_info = models_status[selected_model] | |
| if status_info['status'] != 'available': | |
| if status_info['status'] == 'quota_exceeded': | |
| st.warning("⚠️ هذا النموذج استنفد حصته اليومية" if st.session_state.language == 'ar' else "⚠️ This model has exceeded its daily quota") | |
| elif status_info['status'] == 'not_configured': | |
| st.info("ℹ️ هذا النموذج غير مُعد - سيتم استخدام البديل" if st.session_state.language == 'ar' else "ℹ️ This model is not configured - fallback will be used") | |
| else: | |
| st.error("❌ هذا النموذج غير متاح حالياً" if st.session_state.language == 'ar' else "❌ This model is currently unavailable") | |
| # Language selection for answers | |
| st.markdown("**" + ("لغة الإجابة" if st.session_state.language == 'ar' else "Answer Language") + "**") | |
| # Language options | |
| language_options = { | |
| "🔄 تلقائي (حسب لغة الواجهة)" if st.session_state.language == 'ar' else "🔄 Auto (Interface language)": 'auto', | |
| "🇸🇦 العربية": 'ar', | |
| "🇺🇸 English": 'en', | |
| "🇫🇷 Français": 'fr', | |
| "🇪🇸 Español": 'es', | |
| "🇩🇪 Deutsch": 'de', | |
| "🇨🇳 中文": 'zh' | |
| } | |
| # Find current language selection | |
| current_lang = st.session_state.get('preferred_answer_language', 'auto') | |
| current_lang_index = 0 | |
| for i, (display_name, lang_code) in enumerate(language_options.items()): | |
| if lang_code == current_lang: | |
| current_lang_index = i | |
| break | |
| # Language selector | |
| selected_language_display = st.selectbox( | |
| "لغة الإجابة المفضلة" if st.session_state.language == 'ar' else "Preferred Answer Language", | |
| options=list(language_options.keys()), | |
| index=current_lang_index, | |
| help="اختر اللغة التي تريد الحصول على الإجابة بها" if st.session_state.language == 'ar' else "Choose the language for AI responses" | |
| ) | |
| selected_answer_language = language_options[selected_language_display] | |
| st.session_state.preferred_answer_language = selected_answer_language | |
| # Show language info | |
| if selected_answer_language == 'auto': | |
| current_ui_lang = "العربية" if st.session_state.language == 'ar' else "English" | |
| st.caption(f"ℹ️ سيتم استخدام لغة الواجهة الحالية: {current_ui_lang}" if st.session_state.language == 'ar' else f"ℹ️ Will use current interface language: {current_ui_lang}") | |
| else: | |
| lang_names = {'ar': 'العربية', 'en': 'English', 'fr': 'Français', 'es': 'Español', 'de': 'Deutsch', 'zh': '中文'} | |
| selected_lang_name = lang_names.get(selected_answer_language, selected_answer_language) | |
| st.caption(f"ℹ️ الإجابات ستكون باللغة: {selected_lang_name}" if st.session_state.language == 'ar' else f"ℹ️ Answers will be in: {selected_lang_name}") | |
| # Question templates | |
| st.markdown("**" + ("قوالب الأسئلة السريعة" if st.session_state.language == 'ar' else "Quick Question Templates") + "**") | |
| templates = question_engine.get_question_templates(st.session_state.language) | |
| # Display templates as buttons in columns | |
| cols = st.columns(2) | |
| for i, template in enumerate(templates[:6]): # Show first 6 templates | |
| with cols[i % 2]: | |
| if st.button(template, key=f"template_{i}", use_container_width=True): | |
| # Process template question | |
| process_ai_question(template, is_template=True, preferred_model=selected_model, answer_language=selected_answer_language) | |
| return | |
| # Custom question input | |
| st.markdown("**" + ("أو اكتب سؤالك الخاص" if st.session_state.language == 'ar' else "Or Write Your Own Question") + "**") | |
| custom_question = st.text_area( | |
| "سؤالك" if st.session_state.language == 'ar' else "Your Question", | |
| placeholder="اكتب سؤالك هنا..." if st.session_state.language == 'ar' else "Type your question here...", | |
| height=100 | |
| ) | |
| # Action buttons | |
| col_ask, col_cancel = st.columns(2) | |
| with col_ask: | |
| if st.button("🚀 اسأل" if st.session_state.language == 'ar' else "🚀 Ask", type="primary", disabled=not custom_question.strip()): | |
| if custom_question.strip(): | |
| process_ai_question(custom_question.strip(), is_template=False, preferred_model=selected_model, answer_language=selected_answer_language) | |
| return | |
| with col_cancel: | |
| if st.button("❌ إلغاء" if st.session_state.language == 'ar' else "❌ Cancel"): | |
| st.session_state.show_question_modal = False | |
| st.session_state.selected_text = None | |
| st.session_state.selected_segment_id = None | |
| st.rerun() | |
| def process_ai_question(question: str, is_template: bool = False, preferred_model: str = 'auto', answer_language: str = 'auto'): | |
| """Process AI question and show response""" | |
| question_engine = get_ai_question_engine() | |
| # Prepare segment info | |
| segment_info = { | |
| 'id': st.session_state.selected_segment_id, | |
| 'start_ms': 0, # We'll get this from the actual segment if needed | |
| 'end_ms': 0 | |
| } | |
| # Show processing indicator using custom feedback | |
| processing_message = "جاري معالجة سؤالك..." if st.session_state.language == 'ar' else "Processing your question..." | |
| feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language) | |
| try: | |
| # Determine answer language | |
| if answer_language == 'auto': | |
| answer_lang = st.session_state.language | |
| else: | |
| answer_lang = answer_language | |
| # Process question | |
| result = question_engine.process_question( | |
| selected_text=st.session_state.selected_text, | |
| question=question, | |
| segment_info=segment_info, | |
| ui_language=answer_lang, # Use selected answer language | |
| session_id=st.session_state.current_question_session, | |
| preferred_model=preferred_model | |
| ) | |
| # Handle different return formats for backward compatibility | |
| if len(result) == 4: | |
| answer, error, session_id, model_used = result | |
| else: | |
| answer, error, session_id = result | |
| model_used = "Unknown" | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| except Exception as e: | |
| feedback_placeholder.empty() | |
| cleanup_processing_state() | |
| raise e | |
| # Update session ID | |
| st.session_state.current_question_session = session_id | |
| if answer: | |
| # Check response type and model fallback | |
| is_simple_response = "ملاحظة: هذه إجابة مبسطة" in answer or "Note: This is a simplified response" in answer | |
| preferred_model = st.session_state.get('preferred_ai_model', 'auto') | |
| model_fallback = preferred_model != 'auto' and preferred_model != model_used | |
| if is_simple_response: | |
| st.warning("⚠️ خدمة الذكاء الاصطناعي غير متاحة حالياً - إجابة مبسطة" if st.session_state.language == 'ar' else "⚠️ AI service temporarily unavailable - simplified response") | |
| elif model_fallback: | |
| st.info(f"ℹ️ النموذج المفضل ({preferred_model}) غير متاح - تم استخدام {model_used}" if st.session_state.language == 'ar' else f"ℹ️ Preferred model ({preferred_model}) unavailable - used {model_used}") | |
| else: | |
| st.success("تم الحصول على الإجابة!" if st.session_state.language == 'ar' else "Got the answer!") | |
| # Display Q&A | |
| st.markdown("### " + ("السؤال" if st.session_state.language == 'ar' else "Question")) | |
| st.write(f"❓ {question}") | |
| st.markdown("### " + ("الإجابة" if st.session_state.language == 'ar' else "Answer")) | |
| st.write(f"🤖 {answer}") | |
| # Show which model was used with enhanced styling | |
| if model_used: | |
| # Get model status for better display | |
| question_engine = get_ai_question_engine() | |
| models_status = question_engine.check_model_availability() | |
| model_info = models_status.get(model_used, {}) | |
| icon = model_info.get('icon', '🤖') | |
| color = model_info.get('color', 'gray') | |
| # Show if user's preferred model was used or fallback occurred | |
| preferred_model = st.session_state.get('preferred_ai_model', 'auto') | |
| if preferred_model != 'auto' and preferred_model != model_used: | |
| fallback_msg = " (تم التبديل للبديل)" if st.session_state.language == 'ar' else " (fallback used)" | |
| color = "orange" | |
| else: | |
| fallback_msg = "" | |
| model_display = f"{icon} {model_used}{fallback_msg}" | |
| # Show answer language info | |
| answer_lang = st.session_state.get('preferred_answer_language', 'auto') | |
| if answer_lang == 'auto': | |
| lang_display = "تلقائي" if st.session_state.language == 'ar' else "Auto" | |
| lang_flag = "🔄" | |
| else: | |
| lang_flags = {'ar': '🇸🇦', 'en': '🇺🇸', 'fr': '🇫🇷', 'es': '🇪🇸', 'de': '🇩🇪', 'zh': '🇨🇳'} | |
| lang_names = {'ar': 'العربية', 'en': 'English', 'fr': 'Français', 'es': 'Español', 'de': 'Deutsch', 'zh': '中文'} | |
| lang_flag = lang_flags.get(answer_lang, '🌐') | |
| lang_display = lang_names.get(answer_lang, answer_lang) | |
| info_text = f"🔧 {'النموذج' if st.session_state.language == 'ar' else 'Model'}: {model_display} | 🌐 {'اللغة' if st.session_state.language == 'ar' else 'Language'}: {lang_flag} {lang_display}" | |
| st.markdown(f"<div style='background-color: rgba(0,0,0,0.1); padding: 8px; border-radius: 5px; margin: 5px 0;'><small style='color: {color}'>{info_text}</small></div>", unsafe_allow_html=True) | |
| # Show additional help for simple responses | |
| if is_simple_response: | |
| with st.expander("💡 نصائح للحصول على إجابات أفضل" if st.session_state.language == 'ar' else "💡 Tips for better answers"): | |
| if st.session_state.language == 'ar': | |
| st.markdown(""" | |
| **لماذا الإجابة مبسطة؟** | |
| - تم استنفاد الحد اليومي لخدمة Gemini AI المجانية (50 طلب/يوم) | |
| - النظام يستخدم إجابات مبسطة كبديل مؤقت | |
| **للحصول على إجابات أفضل:** | |
| - حاول مرة أخرى غداً (يتم تجديد الحد اليومي) | |
| - اطرح أسئلة أكثر تحديداً | |
| - ابحث في مصادر إضافية للموضوع | |
| """) | |
| else: | |
| st.markdown(""" | |
| **Why is the answer simplified?** | |
| - Daily limit for free Gemini AI service exceeded (50 requests/day) | |
| - System is using simplified responses as temporary fallback | |
| **For better answers:** | |
| - Try again tomorrow (daily limit resets) | |
| - Ask more specific questions | |
| - Search additional sources for the topic | |
| """) | |
| # Action buttons for the response | |
| col_copy, col_follow, col_close = st.columns(3) | |
| with col_copy: | |
| if st.button("📋 نسخ" if st.session_state.language == 'ar' else "📋 Copy"): | |
| # Format for copying | |
| copy_text = f"السؤال: {question}\nالإجابة: {answer}" if st.session_state.language == 'ar' else f"Question: {question}\nAnswer: {answer}" | |
| st.code(copy_text, language=None) | |
| st.success("تم تنسيق النص للنسخ أعلاه" if st.session_state.language == 'ar' else "Text formatted for copying above") | |
| with col_follow: | |
| if st.button("➕ سؤال متابعة" if st.session_state.language == 'ar' else "➕ Follow-up"): | |
| # Keep modal open for follow-up question | |
| st.rerun() | |
| with col_close: | |
| if st.button("✅ إغلاق" if st.session_state.language == 'ar' else "✅ Close"): | |
| st.session_state.show_question_modal = False | |
| st.session_state.selected_text = None | |
| st.session_state.selected_segment_id = None | |
| st.rerun() | |
| else: | |
| # Show error | |
| st.error(f"خطأ: {error}" if st.session_state.language == 'ar' else f"Error: {error}") | |
| # Retry and close buttons | |
| col_retry, col_close = st.columns(2) | |
| with col_retry: | |
| if st.button("🔄 إعادة المحاولة" if st.session_state.language == 'ar' else "🔄 Retry"): | |
| process_ai_question(question, is_template) | |
| return | |
| with col_close: | |
| if st.button("❌ إغلاق" if st.session_state.language == 'ar' else "❌ Close"): | |
| st.session_state.show_question_modal = False | |
| st.session_state.selected_text = None | |
| st.session_state.selected_segment_id = None | |
| st.rerun() | |
| # --- Export Modal Function --- | |
| def show_export_modal(): | |
| """Display export modal with preview and options""" | |
| # Initialize Google Docs auth | |
| if 'google_auth' not in st.session_state: | |
| st.session_state.google_auth = GoogleDocsAuth() | |
| google_auth = st.session_state.google_auth | |
| # Filter segments from export timestamp | |
| if not st.session_state.export_timestamp or not st.session_state.broadcast_segments: | |
| st.session_state.show_export_modal = False | |
| return | |
| # Get segments after export timestamp | |
| filtered_segments = [] | |
| for segment in st.session_state.broadcast_segments: | |
| if segment.get('start_ms', 0) >= st.session_state.export_timestamp: | |
| filtered_segments.append(segment) | |
| # Sort by start time (oldest first for export) | |
| filtered_segments.sort(key=lambda s: s.get('start_ms', 0)) | |
| if not filtered_segments: | |
| st.warning("لا توجد مقاطع جديدة للتصدير منذ الضغط على الزر" if st.session_state.language == 'ar' else "No new segments to export since button press") | |
| if st.button("إغلاق" if st.session_state.language == 'ar' else "Close"): | |
| st.session_state.show_export_modal = False | |
| st.rerun() | |
| return | |
| # Export preview | |
| st.subheader("📋 معاينة التصدير" if st.session_state.language == 'ar' else "📋 Export Preview") | |
| export_time = datetime.fromtimestamp(st.session_state.export_timestamp / 1000) | |
| st.info(f"{'المقاطع من وقت' if st.session_state.language == 'ar' else 'Segments from'}: {export_time.strftime('%H:%M:%S')}") | |
| st.info(f"{'عدد المقاطع' if st.session_state.language == 'ar' else 'Number of segments'}: {len(filtered_segments)}") | |
| # Show preview of segments | |
| with st.expander("معاينة المحتوى" if st.session_state.language == 'ar' else "Content Preview", expanded=False): | |
| for i, segment in enumerate(filtered_segments[:3]): # Show first 3 segments | |
| start_time = segment.get('start_ms', 0) / 1000 | |
| end_time = segment.get('end_ms', 0) / 1000 | |
| st.markdown(f"**[{start_time:.2f}s → {end_time:.2f}s]**") | |
| st.write(segment.get('text', '')[:100] + "..." if len(segment.get('text', '')) > 100 else segment.get('text', '')) | |
| if i < 2 and i < len(filtered_segments) - 1: | |
| st.divider() | |
| if len(filtered_segments) > 3: | |
| st.caption(f"... {'و' if st.session_state.language == 'ar' else 'and'} {len(filtered_segments) - 3} {'مقاطع أخرى' if st.session_state.language == 'ar' else 'more segments'}") | |
| # Export options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| format_options = { | |
| "📄 Word Document": "word", | |
| "📝 Google Docs": "google_docs" | |
| } | |
| selected_format = st.selectbox( | |
| "تنسيق التصدير" if st.session_state.language == 'ar' else "Export Format", | |
| options=list(format_options.keys()), | |
| index=0 | |
| ) | |
| st.session_state.export_format = format_options[selected_format] | |
| with col2: | |
| include_summary = st.checkbox( | |
| "تضمين الملخص" if st.session_state.language == 'ar' else "Include Summary", | |
| value=True | |
| ) | |
| # Google Docs authentication section | |
| if st.session_state.export_format == 'google_docs': | |
| st.markdown("---") | |
| if google_auth.is_authenticated(): | |
| st.success("✅ " + ("متصل بـ Google Docs" if st.session_state.language == 'ar' else "Connected to Google Docs")) | |
| col_logout, col_info = st.columns([1, 2]) | |
| with col_logout: | |
| if st.button("🚪 " + ("تسجيل خروج" if st.session_state.language == 'ar' else "Logout")): | |
| google_auth.logout() | |
| st.rerun() | |
| with col_info: | |
| st.caption("سيتم إنشاء المستند في حسابك على Google" if st.session_state.language == 'ar' else "Document will be created in your Google account") | |
| else: | |
| st.warning("🔐 " + ("يجب تسجيل الدخول إلى Google Docs أولاً" if st.session_state.language == 'ar' else "Please authenticate with Google Docs first")) | |
| # Handle OAuth callback | |
| auth_code = st.query_params.get("code") | |
| if auth_code: | |
| success, message = google_auth.handle_auth_callback(auth_code) | |
| if success: | |
| st.success(message) | |
| # Clear the code from URL | |
| st.query_params.clear() | |
| st.rerun() | |
| else: | |
| st.error(message) | |
| # Show authentication button | |
| auth_url, error = google_auth.get_auth_url() | |
| if auth_url: | |
| st.markdown(f""" | |
| <a href="{auth_url}" target="_blank"> | |
| <button style="background-color: #4285f4; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer;"> | |
| 🔗 {"تسجيل الدخول إلى Google" if st.session_state.language == 'ar' else "Sign in with Google"} | |
| </button> | |
| </a> | |
| """, unsafe_allow_html=True) | |
| st.caption("سيتم فتح نافذة جديدة للمصادقة" if st.session_state.language == 'ar' else "A new window will open for authentication") | |
| else: | |
| st.error(f"خطأ في إعداد Google API: {error}" if st.session_state.language == 'ar' else f"Google API setup error: {error}") | |
| st.info("يرجى إعداد GOOGLE_CLIENT_ID و GOOGLE_CLIENT_SECRET في متغيرات البيئة" if st.session_state.language == 'ar' else "Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables") | |
| # Export buttons | |
| col_export, col_cancel = st.columns(2) | |
| with col_export: | |
| # Disable export button if Google Docs is selected but not authenticated | |
| export_disabled = (st.session_state.export_format == 'google_docs' and not google_auth.is_authenticated()) | |
| export_button_text = "🚀 تصدير" if st.session_state.language == 'ar' else "🚀 Export" | |
| if st.button(export_button_text, type="primary", disabled=export_disabled): | |
| perform_export(filtered_segments, include_summary, google_auth) | |
| with col_cancel: | |
| if st.button("❌ إلغاء" if st.session_state.language == 'ar' else "❌ Cancel"): | |
| st.session_state.show_export_modal = False | |
| st.rerun() | |
| # --- Export Execution Function --- | |
| def perform_export(segments, include_summary=True, google_auth=None): | |
| """Perform the actual export operation""" | |
| try: | |
| # Initialize exporter | |
| translator = get_translator() | |
| exporter = BroadcastExporter(translator) | |
| # Create export configuration | |
| config = ExportConfig( | |
| export_timestamp=st.session_state.export_timestamp, | |
| format_type=st.session_state.export_format, | |
| include_summary=include_summary, | |
| ui_language=st.session_state.language, | |
| target_language=st.session_state.get('broadcast_translation_lang', 'ar') | |
| ) | |
| # Prepare export content | |
| content = exporter.prepare_export_content(segments, config) | |
| # Show progress using custom feedback | |
| export_message = "جاري التصدير..." if st.session_state.language == 'ar' else "Exporting..." | |
| feedback_placeholder = show_processing_feedback(export_message, st.session_state.language) | |
| try: | |
| # Perform export with fallback | |
| result, error = exporter.export_with_fallback(content, config, google_auth) | |
| # إزالة رسالة المعالجة | |
| feedback_placeholder.empty() | |
| except Exception as e: | |
| feedback_placeholder.empty() | |
| cleanup_processing_state() | |
| raise e | |
| if result and not error: | |
| if config.format_type == 'word': | |
| # Provide download link for Word document | |
| with open(result, 'rb') as file: | |
| st.download_button( | |
| label="📥 تحميل الملف" if st.session_state.language == 'ar' else "📥 Download File", | |
| data=file.read(), | |
| file_name=os.path.basename(result), | |
| mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| ) | |
| st.success("تم إنشاء الملف بنجاح!" if st.session_state.language == 'ar' else "File created successfully!") | |
| else: | |
| # Google Docs URL | |
| st.success("تم إنشاء المستند بنجاح!" if st.session_state.language == 'ar' else "Document created successfully!") | |
| st.markdown(f"[فتح في Google Docs]({result})" if st.session_state.language == 'ar' else f"[Open in Google Docs]({result})") | |
| else: | |
| st.error(f"خطأ في التصدير: {error}" if st.session_state.language == 'ar' else f"Export error: {error}") | |
| except Exception as e: | |
| st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}") | |
| # Close modal after export attempt | |
| if st.button("إغلاق" if st.session_state.language == 'ar' else "Close"): | |
| st.session_state.show_export_modal = False | |
| st.rerun() | |
| # Note: external live slice helper removed to keep the app simple and fully local | |
| # --- Step 2: Review and Customize (REMOVED) --- | |
| # This section was removed as requested by user to simplify the interface | |
| # Results are now shown directly in show_processing_results() function | |
| def reset_session(): | |
| """Resets the session state by clearing specific keys and re-initializing.""" | |
| log_to_browser_console("--- INFO: Resetting session state. ---") | |
| keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording'] | |
| for key in keys_to_clear: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| initialize_session_state() | |
| # --- Entry Point --- | |
| if __name__ == "__main__": | |
| if check_api_key(): | |
| initialize_session_state() | |
| main() | |