syncmaster8 / app.py
aseelflihan's picture
fix
ba4d300
# app.py - Refactored to eliminate recorder_server.py dependency
import streamlit as st
import os
import tempfile
import json
from pathlib import Path
import time
import traceback
import streamlit.components.v1 as components
import hashlib
from datetime import datetime
# from st_audiorec import st_audiorec # Import the new recorder component - OLD
# Reduce metrics/usage writes that can cause permission errors on hosted environments
try:
st.set_option('browser.gatherUsageStats', False)
except Exception:
pass
# Robust component declaration: prefer local build, else fall back to pip package
parent_dir = os.path.dirname(os.path.abspath(__file__))
build_dir = os.path.join(parent_dir, "custom_components/st-audiorec/st_audiorec/frontend/build")
def st_audiorec(key=None):
"""Return audio recorder component value, trying local build first, then pip package fallback."""
try:
if os.path.isdir(build_dir):
_component_func = components.declare_component("st_audiorec", path=build_dir)
return _component_func(key=key, default=0)
# Fallback to pip-installed component if available
try:
from st_audiorec import st_audiorec as st_audiorec_pkg
return st_audiorec_pkg(key=key)
except Exception:
st.warning("Audio recorder component is unavailable on this deployment (missing local build and pip fallback).")
return None
except Exception:
# Final safety net
st.warning("Failed to initialize audio recorder component.")
return None
# --- Critical Imports and Initial Checks ---
AUDIO_PROCESSOR_CLASS = None
IMPORT_ERROR_TRACEBACK = None
try:
from audio_processor import AudioProcessor
AUDIO_PROCESSOR_CLASS = AudioProcessor
except Exception:
IMPORT_ERROR_TRACEBACK = traceback.format_exc()
from video_generator import VideoGenerator
from mp3_embedder import MP3Embedder
from utils import format_timestamp
from translator import get_translator, UI_TRANSLATIONS
from exporter import BroadcastExporter, ExportConfig
from google_docs_config import google_docs_manager
from ai_questions import get_ai_question_engine, TextSelection
from style_fixes import apply_custom_styling, create_broadcast_bubble, create_white_container, create_processing_result_container
import requests
from dotenv import load_dotenv
# --- API Key Check ---
def check_api_key():
"""Check for Gemini API key and display instructions if not found."""
load_dotenv()
if not os.getenv("GEMINI_API_KEY"):
st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!")
st.info("To fix this, please follow these steps:")
st.markdown("""
1. **Find the file named `.env.example`** in the `syncmaster2` directory.
2. **Rename it to `.env`**.
3. **Open the `.env` file** with a text editor.
4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey).
5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`.
6. **Save the file and restart the application.**
""")
return False
return True
# --- Summary Helper (robust to cached translator without summarize_text) ---
def generate_summary(text: str, target_language: str = 'ar'):
"""Generate a concise summary in target_language, with graceful fallback.
If summarize_text is unavailable (cached instance), fall back to Arabic summary
then translate to the target language if needed.
"""
tr = get_translator()
try:
if hasattr(tr, 'summarize_text') and callable(getattr(tr, 'summarize_text')):
s, err = tr.summarize_text(text or '', target_language=target_language)
if s:
return s, None
# Fallback path: Arabic summary first
s_ar, err_ar = tr.summarize_text_arabic(text or '')
if target_language and target_language != 'ar' and s_ar:
tx, err_tx = tr.translate_text(s_ar, target_language=target_language)
if tx:
return tx, None
return s_ar, err_tx
return s_ar, err_ar
except Exception as e:
return None, str(e)
# --- Custom Feedback Functions (No Dimming Effect) ---
def show_processing_feedback(message: str, language: str = 'en'):
"""عرض تغذية راجعة للمعالجة بدون تعتيم الصفحة"""
if language == 'ar':
feedback_message = f"🔄 {message}"
else:
feedback_message = f"🔄 {message}"
# استخدام st.info بدلاً من st.spinner لتجنب تأثير التعتيم
info_placeholder = st.info(feedback_message)
return info_placeholder
def show_status_update(message: str, status_type: str = 'info', language: str = 'en'):
"""عرض تحديث الحالة بدون تعتيم الصفحة"""
# إضافة الرموز التعبيرية المناسبة
if status_type == 'success':
icon = "✅"
st.success(f"{icon} {message}")
elif status_type == 'error':
icon = "❌"
st.error(f"{icon} {message}")
elif status_type == 'warning':
icon = "⚠️"
st.warning(f"{icon} {message}")
else:
icon = "ℹ️"
st.info(f"{icon} {message}")
def cleanup_processing_state():
"""تنظيف حالة المعالجة عند حدوث خطأ"""
if 'processing_state' in st.session_state:
st.session_state.processing_state = {
'is_processing': False,
'current_task': '',
'progress': 0.0,
'message': '',
'language': st.session_state.get('language', 'en')
}
# تنظيف حالات المعالجة الأخرى
if 'processing_status' in st.session_state:
for task_id in list(st.session_state.processing_status.keys()):
if st.session_state.processing_status[task_id] in ['processing', 'queued']:
st.session_state.processing_status[task_id] = 'failed'
def show_processing_progress(message: str, progress: float = None, language: str = 'en'):
"""عرض تقدم المعالجة مع شريط التقدم بدون تعتيم"""
if language == 'ar':
feedback_message = f"🔄 {message}"
else:
feedback_message = f"🔄 {message}"
st.info(feedback_message)
if progress is not None:
progress_bar = st.progress(progress)
return progress_bar
else:
# شريط تقدم غير محدد
progress_bar = st.progress(0)
return progress_bar
# --- Page Configuration ---
st.set_page_config(
page_title="SyncMaster - AI Audio-Text Synchronization",
page_icon="🎵",
layout="wide"
)
# --- Browser Console Logging Utility ---
def log_to_browser_console(messages):
"""Injects JavaScript to log messages to the browser's console."""
if isinstance(messages, str):
messages = [messages]
escaped_messages = [json.dumps(str(msg)) for msg in messages]
js_code = f"""
<script>
(function() {{
const logs = [{', '.join(escaped_messages)}];
console.group("Backend Logs from SyncMaster");
logs.forEach(log => {{
const content = String(log);
if (content.includes('--- ERROR') || content.includes('--- FATAL')) {{
console.error(log);
}} else if (content.includes('--- WARNING')) {{
console.warn(log);
}} else if (content.includes('--- DEBUG')) {{
console.debug(log);
}} else {{
console.log(log);
}}
}});
console.groupEnd();
}})();
</script>
"""
components.html(js_code, height=0, scrolling=False)
# --- AI Models Reset Function ---
def reset_ai_models():
"""Reset all AI models and clear cache completely"""
# Clear ALL session state keys that might contain cached AI instances
keys_to_clear = []
for key in list(st.session_state.keys()):
if any(term in key.lower() for term in ['translator', 'ai', 'question', 'model', 'engine', 'processing']):
keys_to_clear.append(key)
for key in keys_to_clear:
del st.session_state[key]
# Force reload environment variables
load_dotenv(override=True)
# Clear Python module cache completely
import sys
import importlib
modules_to_reload = ['translator', 'ai_questions', 'exporter']
for module_name in modules_to_reload:
if module_name in sys.modules:
try:
# Delete from sys.modules first
del sys.modules[module_name]
except:
pass
# Clear any global instances
try:
import translator
if hasattr(translator, 'translator_instance'):
translator.translator_instance = None
except:
pass
try:
import ai_questions
if hasattr(ai_questions, 'ai_question_engine'):
ai_questions.ai_question_engine = None
except:
pass
# --- Session State Initialization ---
def initialize_session_state():
"""Initializes the session state variables if they don't exist."""
if 'step' not in st.session_state:
st.session_state.step = 1
if 'audio_data' not in st.session_state:
st.session_state.audio_data = None
if 'language' not in st.session_state:
st.session_state.language = 'en'
if 'enable_translation' not in st.session_state:
st.session_state.enable_translation = True
if 'target_language' not in st.session_state:
st.session_state.target_language = 'ar'
if 'transcription_data' not in st.session_state:
st.session_state.transcription_data = None
if 'edited_text' not in st.session_state:
st.session_state.edited_text = ""
if 'video_style' not in st.session_state:
st.session_state.video_style = {
'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF',
'highlight_color': '#FFD700', 'background_color': '#000000',
'font_family': 'Arial', 'font_size': 48
}
if 'new_recording' not in st.session_state:
st.session_state.new_recording = None
# Transcript feed (prepend latest) and dedupe set
if 'transcript_feed' not in st.session_state:
st.session_state.transcript_feed = [] # list of {id, ts, text}
if 'transcript_ids' not in st.session_state:
st.session_state.transcript_ids = set()
# Incremental broadcast state
if 'broadcast_segments' not in st.session_state:
st.session_state.broadcast_segments = [] # [{id, recording_id, start_ms, end_ms, checksum, text}]
if 'lastFetchedEnd_ms' not in st.session_state:
st.session_state.lastFetchedEnd_ms = 0
# Broadcast translation language (separate from general UI translation target)
if 'broadcast_translation_lang' not in st.session_state:
# Default broadcast translation target to Arabic
st.session_state.broadcast_translation_lang = 'ar'
if 'summary_language' not in st.session_state:
# Default summary language to Arabic
st.session_state.summary_language = 'ar'
# Auto-generate Arabic summary toggle
if 'auto_generate_summary' not in st.session_state:
st.session_state.auto_generate_summary = True
# Export functionality state
if 'export_timestamp' not in st.session_state:
st.session_state.export_timestamp = None
if 'show_export_modal' not in st.session_state:
st.session_state.show_export_modal = False
if 'export_format' not in st.session_state:
st.session_state.export_format = 'word'
# AI Questions functionality state
if 'selected_text' not in st.session_state:
st.session_state.selected_text = None
if 'selected_segment_id' not in st.session_state:
st.session_state.selected_segment_id = None
if 'show_question_modal' not in st.session_state:
st.session_state.show_question_modal = False
if 'current_question_session' not in st.session_state:
st.session_state.current_question_session = None
if 'preferred_ai_model' not in st.session_state:
st.session_state.preferred_ai_model = 'auto'
if 'preferred_answer_language' not in st.session_state:
st.session_state.preferred_answer_language = 'auto'
# Background processing state
if 'processing_queue' not in st.session_state:
st.session_state.processing_queue = []
if 'processing_results' not in st.session_state:
st.session_state.processing_results = {}
if 'processing_status' not in st.session_state:
st.session_state.processing_status = {}
# --- Background Audio Processing Function ---
def queue_audio_processing(audio_bytes, original_filename="recorded_audio.wav"):
"""Queue audio for background processing"""
import uuid
# Generate unique ID for this processing task
task_id = str(uuid.uuid4())[:8]
# Add to processing queue
task = {
'id': task_id,
'audio_bytes': audio_bytes,
'filename': original_filename,
'timestamp': time.time(),
'status': 'queued'
}
st.session_state.processing_queue.append(task)
st.session_state.processing_status[task_id] = 'queued'
# Show immediate feedback
st.info(f"🔄 {'تم إضافة التسجيل للمعالجة' if st.session_state.language == 'ar' else 'Audio queued for processing'} (ID: {task_id})")
return task_id
def process_queued_audio():
"""Process queued audio in background"""
if not st.session_state.processing_queue:
return
# Process first item in queue
task = st.session_state.processing_queue[0]
task_id = task['id']
# Update status
st.session_state.processing_status[task_id] = 'processing'
task['status'] = 'processing'
# Show processing status using custom feedback instead of st.status
processing_message = f"{'معالجة التسجيل' if st.session_state.language == 'ar' else 'Processing audio'} {task_id}..."
feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language)
try:
# Process the audio
result = run_audio_processing_sync(task['audio_bytes'], task['filename'])
# إزالة رسالة المعالجة
feedback_placeholder.empty()
if result:
# Store result
st.session_state.processing_results[task_id] = result
st.session_state.processing_status[task_id] = 'completed'
task['status'] = 'completed'
show_status_update(f"{'تم الانتهاء من المعالجة' if st.session_state.language == 'ar' else 'Processing completed'} {task_id}", 'success', st.session_state.language)
else:
st.session_state.processing_status[task_id] = 'failed'
task['status'] = 'failed'
show_status_update(f"{'فشلت المعالجة' if st.session_state.language == 'ar' else 'Processing failed'} {task_id}", 'error', st.session_state.language)
except Exception as e:
# إزالة رسالة المعالجة في حالة الخطأ
feedback_placeholder.empty()
st.session_state.processing_status[task_id] = 'failed'
task['status'] = 'failed'
show_status_update(f"{'خطأ في المعالجة' if st.session_state.language == 'ar' else 'Processing error'} {task_id}: {str(e)}", 'error', st.session_state.language)
cleanup_processing_state()
# Remove from queue
st.session_state.processing_queue.pop(0)
# --- Centralized Audio Processing Function ---
def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"):
"""Main audio processing function with background support"""
# Check if background processing is enabled
if st.session_state.get('background_processing', True):
return queue_audio_processing(audio_bytes, original_filename)
else:
return run_audio_processing_sync(audio_bytes, original_filename)
def run_audio_processing_sync(audio_bytes, original_filename="recorded_audio.wav"):
"""
A single, robust function to handle all audio processing.
Takes audio bytes as input and returns the processed data.
"""
# This function is the classic, non-Custom path; ensure editor sections are enabled
st.session_state['_custom_active'] = False
if not audio_bytes:
st.error("No audio data provided to process.")
return
tmp_file_path = None
log_to_browser_console("--- INFO: Starting unified audio processing. ---")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file:
tmp_file.write(audio_bytes)
tmp_file_path = tmp_file.name
processor = AUDIO_PROCESSOR_CLASS()
result_data = None
full_text = ""
word_timestamps = []
# Determine which processing path to take
if st.session_state.enable_translation:
# استخدام التغذية الراجعة المخصصة بدلاً من st.spinner
processing_message = "Performing AI Transcription & Translation... please wait." if st.session_state.language == 'en' else "جاري تنفيذ النسخ والترجمة بالذكاء الاصطناعي... يرجى الانتظار."
feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language)
try:
result_data, processor_logs = processor.get_word_timestamps_with_translation(
tmp_file_path,
st.session_state.target_language,
)
# إزالة رسالة المعالجة
feedback_placeholder.empty()
except Exception as e:
feedback_placeholder.empty()
cleanup_processing_state()
raise e
log_to_browser_console(processor_logs)
if not result_data or not result_data.get("original_text"):
st.warning(
"Could not generate transcription with translation. Check browser console (F12) for logs."
)
return
st.session_state.transcription_data = {
"text": result_data["original_text"],
"translated_text": result_data["translated_text"],
"word_timestamps": result_data["word_timestamps"],
"audio_bytes": audio_bytes,
"original_suffix": Path(original_filename).suffix,
"translation_success": result_data.get("translation_success", False),
"detected_language": result_data.get("language_detected", "unknown"),
}
# Update transcript feed (prepend, dedupe by digest)
try:
digest = hashlib.md5(audio_bytes).hexdigest()
except Exception:
digest = f"snap-{int(time.time()*1000)}"
if digest not in st.session_state.transcript_ids:
st.session_state.transcript_ids.add(digest)
st.session_state.transcript_feed.insert(
0,
{
"id": digest,
"ts": int(time.time() * 1000),
"text": result_data["original_text"],
},
)
# Rebuild edited_text with newest first
st.session_state.edited_text = "\n\n".join(
[s["text"] for s in st.session_state.transcript_feed]
)
else: # Standard processing without translation
# استخدام التغذية الراجعة المخصصة بدلاً من st.spinner
processing_message = "Performing AI Transcription... please wait." if st.session_state.language == 'en' else "جاري تنفيذ النسخ بالذكاء الاصطناعي... يرجى الانتظار."
feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language)
try:
word_timestamps, processor_logs = processor.get_word_timestamps(
tmp_file_path
)
# إزالة رسالة المعالجة
feedback_placeholder.empty()
except Exception as e:
feedback_placeholder.empty()
cleanup_processing_state()
raise e
log_to_browser_console(processor_logs)
if not word_timestamps:
st.warning(
"Could not generate timestamps. Check browser console (F12) for logs."
)
return
full_text = " ".join([d["word"] for d in word_timestamps])
st.session_state.transcription_data = {
"text": full_text,
"word_timestamps": word_timestamps,
"audio_bytes": audio_bytes,
"original_suffix": Path(original_filename).suffix,
"translation_success": False,
}
# Update transcript feed (prepend, dedupe by digest)
try:
digest = hashlib.md5(audio_bytes).hexdigest()
except Exception:
digest = f"snap-{int(time.time()*1000)}"
if digest not in st.session_state.transcript_ids:
st.session_state.transcript_ids.add(digest)
st.session_state.transcript_feed.insert(
0, {"id": digest, "ts": int(time.time() * 1000), "text": full_text}
)
# Rebuild edited_text with newest first
st.session_state.edited_text = "\n\n".join(
[s["text"] for s in st.session_state.transcript_feed]
)
st.session_state.step = 1 # Keep it on the same step
# Return result for background processing
return {
'original_text': result_data.get("original_text") if result_data else full_text,
'translated_text': result_data.get("translated_text") if result_data else None,
'detected_language': result_data.get("language_detected") if result_data else "unknown",
'translation_success': result_data.get("translation_success", False) if result_data else False,
'word_timestamps': result_data.get("word_timestamps") if result_data else word_timestamps
}
except Exception as e:
st.error("An unexpected error occurred during audio processing!")
st.exception(e)
log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---")
return None
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
# --- Main Application Logic ---
def main():
# Apply custom styling first
apply_custom_styling()
# Force reload environment variables
load_dotenv(override=True)
# Clear AI models cache on first run or if there's an issue
if 'app_initialized' not in st.session_state:
reset_ai_models()
st.session_state.app_initialized = True
initialize_session_state()
st.markdown("""
<style>
.main .block-container { animation: fadeIn 0.2s ease-in-out; }
@keyframes fadeIn { from { opacity: 0; } to { opacity: 1; } }
.block-container { padding-top: 1rem; }
</style>
""", unsafe_allow_html=True)
with st.sidebar:
st.markdown("## 🌐 Language Settings")
language_options = {'English': 'en', 'العربية': 'ar'}
selected_lang_display = st.selectbox(
"Interface Language",
options=list(language_options.keys()),
index=0 if st.session_state.language == 'en' else 1
)
st.session_state.language = language_options[selected_lang_display]
st.markdown("## 🔤 Translation Settings")
st.session_state.enable_translation = st.checkbox(
"Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي",
value=st.session_state.enable_translation,
help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً"
)
if st.session_state.enable_translation:
target_lang_options = {
'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es'
}
selected_target = st.selectbox(
"Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة",
options=list(target_lang_options.keys()), index=0
)
st.session_state.target_language = target_lang_options[selected_target]
# Auto summary toggle
st.session_state.auto_generate_summary = st.checkbox(
"Auto-generate Arabic summary" if st.session_state.language == 'en' else "توليد الملخص العربي تلقائياً",
value=st.session_state.auto_generate_summary
)
# Google Account Status
st.markdown("## 🔐 Google Account")
if google_docs_manager.is_authenticated():
st.success("✅ متصل" if st.session_state.language == 'ar' else "✅ Connected")
else:
st.info("🔒 غير متصل" if st.session_state.language == 'ar' else "🔒 Not connected")
# AI Questions Status
st.markdown("## 🤖 AI Questions")
# Show preferred model
if st.session_state.preferred_ai_model != 'auto':
st.info(f"🎯 {'النموذج المفضل' if st.session_state.language == 'ar' else 'Preferred Model'}: {st.session_state.preferred_ai_model}")
else:
st.info("🔄 " + ("تلقائي" if st.session_state.language == 'ar' else "Auto selection"))
# Model reset button
if st.button("🔄 " + ("إعادة تعيين النماذج" if st.session_state.language == 'ar' else "Reset AI Models"), help="إعادة تحميل نماذج الذكاء الاصطناعي" if st.session_state.language == 'ar' else "Reload AI models"):
reset_ai_models()
st.success("✅ " + ("تم إعادة تعيين النماذج" if st.session_state.language == 'ar' else "AI models reset successfully"))
st.rerun()
# Show preferred answer language
answer_lang = st.session_state.get('preferred_answer_language', 'auto')
if answer_lang != 'auto':
lang_names = {'ar': '🇸🇦 العربية', 'en': '🇺🇸 English', 'fr': '🇫🇷 Français', 'es': '🇪🇸 Español', 'de': '🇩🇪 Deutsch', 'zh': '🇨🇳 中文'}
lang_display = lang_names.get(answer_lang, answer_lang)
st.info(f"🌐 {'لغة الإجابة' if st.session_state.language == 'ar' else 'Answer Language'}: {lang_display}")
else:
current_ui_lang = "🇸🇦 العربية" if st.session_state.language == 'ar' else "🇺🇸 English"
st.info(f"🌐 {'لغة الإجابة' if st.session_state.language == 'ar' else 'Answer Language'}: {current_ui_lang} ({'تلقائي' if st.session_state.language == 'ar' else 'Auto'})")
# Test AI services availability
translator = get_translator()
services_status = {}
# Test Gemini
try:
if hasattr(translator, 'model') and translator.model:
test_response = translator.model.generate_content("Test")
services_status['Gemini'] = "✅"
else:
services_status['Gemini'] = "❌"
except Exception as e:
error_str = str(e)
if "429" in error_str or "quota" in error_str.lower():
services_status['Gemini'] = "⚠️"
else:
services_status['Gemini'] = "❌"
# Test Groq
try:
if hasattr(translator, '_groq_complete') and translator.groq_api_key:
services_status['Groq'] = "✅"
else:
services_status['Groq'] = "❌"
except Exception:
services_status['Groq'] = "❌"
# Test OpenRouter
try:
if hasattr(translator, '_openrouter_complete') and translator.openrouter_api_key:
services_status['OpenRouter'] = "✅"
else:
services_status['OpenRouter'] = "❌"
except Exception:
services_status['OpenRouter'] = "❌"
# Display services status
st.markdown("**" + ("حالة النماذج" if st.session_state.language == 'ar' else "Models Status") + ":**")
for service, status in services_status.items():
if status == "✅":
st.success(f"{status} {service}")
elif status == "⚠️":
st.warning(f"{status} {service} (حد يومي)" if st.session_state.language == 'ar' else f"{status} {service} (quota)")
else:
st.error(f"{status} {service}")
# Overall status
available_count = sum(1 for status in services_status.values() if status == "✅")
if available_count > 0:
st.info(f"🤖 {available_count}/3 " + ("نماذج متاحة" if st.session_state.language == 'ar' else "models available"))
else:
st.warning("⚠️ جميع النماذج غير متاحة" if st.session_state.language == 'ar' else "⚠️ All models unavailable")
# Show conversation status and usage stats
question_engine = get_ai_question_engine()
usage_stats = question_engine.get_model_usage_stats()
total_questions = sum(usage_stats.values())
if st.session_state.current_question_session:
session = question_engine.get_conversation_history(st.session_state.current_question_session)
if session and session.conversation:
st.info(f"💬 {len(session.conversation)} " + ("أسئلة نشطة" if st.session_state.language == 'ar' else "active questions"))
else:
st.info("🤖 جاهز للأسئلة" if st.session_state.language == 'ar' else "🤖 Ready for questions")
else:
st.info("🤖 جاهز للأسئلة" if st.session_state.language == 'ar' else "🤖 Ready for questions")
# Show usage statistics
if total_questions > 0:
with st.expander("📊 إحصائيات الاستخدام" if st.session_state.language == 'ar' else "📊 Usage Statistics"):
st.write(f"**{'إجمالي الأسئلة' if st.session_state.language == 'ar' else 'Total Questions'}: {total_questions}**")
for model, count in usage_stats.items():
if count > 0:
percentage = (count / total_questions) * 100
st.write(f"• {model}: {count} ({percentage:.1f}%)")
else:
st.caption("📊 لا توجد إحصائيات بعد" if st.session_state.language == 'ar' else "📊 No statistics yet")
# Quick model switcher
st.markdown("**" + ("تبديل سريع للنموذج" if st.session_state.language == 'ar' else "Quick Model Switch") + "**")
question_engine = get_ai_question_engine()
models_status = question_engine.check_model_availability()
# Create buttons for each available model
cols = st.columns(2)
model_names = ['auto', 'Gemini AI', 'Groq AI', 'OpenRouter AI']
for i, model in enumerate(model_names):
with cols[i % 2]:
if model == 'auto':
button_text = "🔄 تلقائي" if st.session_state.language == 'ar' else "🔄 Auto"
is_current = st.session_state.preferred_ai_model == 'auto'
else:
status_info = models_status.get(model, {})
icon = status_info.get('icon', '❓')
button_text = f"{icon} {model.split()[0]}" # Show first word + icon
is_current = st.session_state.preferred_ai_model == model
button_type = "primary" if is_current else "secondary"
if st.button(button_text, key=f"switch_{model}", type=button_type, use_container_width=True):
st.session_state.preferred_ai_model = model
st.rerun()
# Quick language switcher
st.markdown("**" + ("تبديل سريع للغة" if st.session_state.language == 'ar' else "Quick Language Switch") + "**")
language_buttons = {
'auto': "🔄 تلقائي" if st.session_state.language == 'ar' else "🔄 Auto",
'ar': "🇸🇦 عربي",
'en': "🇺🇸 EN",
'fr': "🇫🇷 FR",
'es': "🇪🇸 ES"
}
cols_lang = st.columns(3)
for i, (lang_code, button_text) in enumerate(language_buttons.items()):
with cols_lang[i % 3]:
is_current_lang = st.session_state.get('preferred_answer_language', 'auto') == lang_code
button_type_lang = "primary" if is_current_lang else "secondary"
if st.button(button_text, key=f"switch_lang_{lang_code}", type=button_type_lang, use_container_width=True):
st.session_state.preferred_answer_language = lang_code
st.rerun()
# Processing status
if st.session_state.processing_queue or st.session_state.processing_results:
st.markdown("## 🔄 " + ("حالة المعالجة" if st.session_state.language == 'ar' else "Processing Status"))
# Queue status
if st.session_state.processing_queue:
queue_count = len(st.session_state.processing_queue)
st.warning(f"⏳ {queue_count} " + ("في الانتظار" if st.session_state.language == 'ar' else "in queue"))
# Results count
if st.session_state.processing_results:
results_count = len(st.session_state.processing_results)
st.success(f"✅ {results_count} " + ("مكتمل" if st.session_state.language == 'ar' else "completed"))
# Clear all button
if st.button("🗑️ " + ("مسح الكل" if st.session_state.language == 'ar' else "Clear All")):
st.session_state.processing_queue = []
st.session_state.processing_results = {}
st.session_state.processing_status = {}
st.rerun()
st.title("🎵 SyncMaster")
if st.session_state.language == 'ar':
st.markdown("### منصة المزامنة الذكية بين الصوت والنص")
else:
st.markdown("### The Intelligent Audio-Text Synchronization Platform")
# Simplified interface - removed step indicators as requested
# Global settings for long recording retention and custom snapshot duration
with st.expander("⚙️ Recording Settings (Snapshots)", expanded=False):
st.session_state.setdefault('retention_minutes', 30)
# 0 means: use full buffer by default for Custom
st.session_state.setdefault('custom_snapshot_seconds', 0)
# Auto-Custom interval seconds (for frontend auto trigger)
st.session_state.setdefault('auto_custom_interval_sec', 10)
# Auto-start incremental snapshots when recording begins
st.session_state.setdefault('auto_start_custom', False)
st.session_state.retention_minutes = st.number_input("Retention window (minutes)", min_value=5, max_value=240, value=st.session_state.retention_minutes)
st.session_state.custom_snapshot_seconds = st.number_input("Custom snapshot (seconds; 0 = full buffer)", min_value=0, max_value=3600, value=st.session_state.custom_snapshot_seconds)
st.session_state.auto_custom_interval_sec = st.number_input("Auto Custom interval (seconds)", min_value=1, max_value=3600, value=st.session_state.auto_custom_interval_sec, help="How often to auto-trigger the same Custom action while recording.")
st.session_state.auto_start_custom = st.checkbox("Auto-start incremental snapshots on record", value=st.session_state.auto_start_custom, help="Start sending Custom intervals automatically as soon as you start recording.")
# Inject globals into the page for the component to pick up
components.html(f"""
<script>
window.ST_AREC_RETENTION_MINUTES = {int(st.session_state.retention_minutes)};
window.ST_AREC_CUSTOM_SNAPSHOT_SECONDS = {int(st.session_state.custom_snapshot_seconds)};
window.ST_AREC_LAST_FETCHED_END_MS = {int(st.session_state.get('lastFetchedEnd_ms', 0))};
window.ST_AREC_CUSTOM_AUTO_INTERVAL_SECONDS = {int(st.session_state.get('auto_custom_interval_sec', 10))};
window.ST_AREC_AUTO_START = {str(bool(st.session_state.get('auto_start_custom', True))).lower()};
console.log('Recorder config', window.ST_AREC_RETENTION_MINUTES, window.ST_AREC_CUSTOM_SNAPSHOT_SECONDS);
</script>
""", height=0)
if AUDIO_PROCESSOR_CLASS is None:
st.error("Fatal Error: The application could not start correctly.")
st.subheader("An error occurred while trying to import `AudioProcessor`:")
st.code(IMPORT_ERROR_TRACEBACK, language="python")
st.stop()
step_1_upload_and_process()
# Process background queue
if st.session_state.get('background_processing', True) and st.session_state.processing_queue:
process_queued_audio()
# Show processing results optionally
if st.session_state.get('show_processing_results', False):
show_processing_results()
elif st.session_state.processing_results:
# Show a button to view results if there are any
if st.button("📝 " + ("عرض نتائج المعالجة" if st.session_state.language == 'ar' else "Show Processing Results") + f" ({len(st.session_state.processing_results)})", type="secondary"):
st.session_state.show_processing_results = True
st.rerun()
# Note: step_2_review_and_customize removed as requested
# Results are now shown in show_processing_results() function
# AI Question modal (show outside of other components)
if st.session_state.show_question_modal:
show_question_modal()
# Export modal (show outside of other components)
if st.session_state.show_export_modal:
show_export_modal()
# --- Show Processing Results ---
def show_processing_results():
"""Show processing results in the same page"""
if not st.session_state.processing_results:
return
st.markdown("---")
# Header with results count and close button
col_header, col_close = st.columns([4, 1])
with col_header:
results_count = len(st.session_state.processing_results)
st.subheader(f"📝 {'نتائج المعالجة' if st.session_state.language == 'ar' else 'Processing Results'} ({results_count})")
with col_close:
if st.button("❌ " + ("إخفاء" if st.session_state.language == 'ar' else "Hide"), key="hide_results"):
st.session_state.show_processing_results = False
st.rerun()
if results_count > 1:
# Show all results in one view option
show_all = st.checkbox(
"عرض جميع النتائج مجمعة" if st.session_state.language == 'ar' else "Show all results combined",
help="عرض جميع النصوص والترجمات في مكان واحد" if st.session_state.language == 'ar' else "Display all texts and translations in one place"
)
if show_all:
# Combined view
st.markdown("### " + ("النصوص الأصلية مجمعة" if st.session_state.language == 'ar' else "Combined Original Texts"))
combined_original = "\n\n".join([result.get('original_text', '') for result in st.session_state.processing_results.values() if result.get('original_text')])
if combined_original:
st.write(combined_original)
if st.button("📋 " + ("نسخ جميع النصوص" if st.session_state.language == 'ar' else "Copy All Texts")):
st.code(combined_original, language=None)
st.markdown("### " + ("الترجمات مجمعة" if st.session_state.language == 'ar' else "Combined Translations"))
combined_translation = "\n\n".join([result.get('translated_text', '') for result in st.session_state.processing_results.values() if result.get('translated_text')])
if combined_translation:
st.write(combined_translation)
if st.button("📋 " + ("نسخ جميع الترجمات" if st.session_state.language == 'ar' else "Copy All Translations")):
st.code(combined_translation, language=None)
st.markdown("---")
# Show results for each completed processing
for task_id, result in st.session_state.processing_results.items():
with st.expander(f"🎵 {'التسجيل' if st.session_state.language == 'ar' else 'Recording'} {task_id}", expanded=True):
# Original text in white container
if result.get('original_text'):
original_title = "النص الأصلي" if st.session_state.language == 'ar' else "Original Text"
original_container = create_white_container(original_title, result['original_text'], "📝")
st.markdown(original_container, unsafe_allow_html=True)
# Translation in white container
if result.get('translated_text'):
translation_title = "الترجمة" if st.session_state.language == 'ar' else "Translation"
translation_container = create_white_container(translation_title, result['translated_text'], "🌐")
st.markdown(translation_container, unsafe_allow_html=True)
# Language info
if result.get('detected_language'):
st.caption(f"🌐 {'اللغة المكتشفة' if st.session_state.language == 'ar' else 'Detected language'}: {result['detected_language']}")
# Action buttons
col1, col2, col3 = st.columns(3)
with col1:
if st.button(f"📋 {'نسخ النص' if st.session_state.language == 'ar' else 'Copy Text'}", key=f"copy_original_{task_id}"):
st.code(result.get('original_text', ''), language=None)
st.success("✅ " + ("تم تنسيق النص للنسخ" if st.session_state.language == 'ar' else "Text formatted for copying"))
with col2:
if result.get('translated_text') and st.button(f"📋 {'نسخ الترجمة' if st.session_state.language == 'ar' else 'Copy Translation'}", key=f"copy_translation_{task_id}"):
st.code(result.get('translated_text', ''), language=None)
st.success("✅ " + ("تم تنسيق الترجمة للنسخ" if st.session_state.language == 'ar' else "Translation formatted for copying"))
with col3:
if st.button(f"🗑️ {'حذف' if st.session_state.language == 'ar' else 'Delete'}", key=f"delete_{task_id}"):
del st.session_state.processing_results[task_id]
if task_id in st.session_state.processing_status:
del st.session_state.processing_status[task_id]
st.rerun()
# --- Step 1: Upload and Process ---
def step_1_upload_and_process():
st.header("🎵 " + ("مصدر الصوت" if st.session_state.language == 'ar' else "Audio Source"))
upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"])
with upload_tab:
st.subheader("Upload an existing audio file")
uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A")
if uploaded_file:
st.session_state.audio_data = uploaded_file.getvalue()
st.success(f"File ready for processing: {uploaded_file.name}")
st.audio(st.session_state.audio_data)
if st.button("🚀 Start AI Processing", type="primary", use_container_width=True):
run_audio_processing(st.session_state.audio_data, uploaded_file.name)
if st.session_state.audio_data:
if st.button("🔄 Use a Different File"):
reset_session()
st.rerun()
with record_tab:
st.subheader("Record audio directly from your microphone")
# Recording instructions
# Recording instructions with improved controls
if st.session_state.language == 'ar':
st.info("🎙️ **تحكم بسيط في التسجيل:**\n- اضغط الميكروفون لبدء التسجيل\n- اضغط مرة أخرى للتوقف\n- استخدم الأزرار أدناه للتحكم الإضافي")
else:
st.info("🎙️ **Simple Recording Controls:**\n- Click microphone to start recording\n- Click again to stop\n- Use buttons below for additional control")
# Recording control buttons
col_record_info, col_record_controls = st.columns([2, 1])
with col_record_info:
# This will show recording status
pass
with col_record_controls:
# Recording control buttons removed - now handled by the audio component itself
pass
# Recording status
recording_status_placeholder = st.empty()
# Use the audio recorder component
wav_audio_data = st_audiorec()
# Show recording status and controls
if wav_audio_data:
# Check if wav_audio_data is bytes or dict
if isinstance(wav_audio_data, bytes):
# Simple bytes data - show audio player
recording_status_placeholder.success("🎵 " + ("تسجيل جاهز للمعالجة" if st.session_state.language == 'ar' else "Recording ready for processing"))
# Recording controls
col_play, col_clear = st.columns(2)
with col_play:
st.audio(wav_audio_data, format='audio/wav')
with col_clear:
if st.button("🗑️ " + ("مسح التسجيل" if st.session_state.language == 'ar' else "Clear Recording")):
st.rerun()
elif isinstance(wav_audio_data, dict):
# Dict data - handle interval processing
recording_status_placeholder.info("🔄 " + ("معالجة المقاطع..." if st.session_state.language == 'ar' else "Processing intervals..."))
# Google Docs Export and Logout Buttons
col_export, col_logout = st.columns([3, 1])
with col_export:
export_button_text = "📤 تصدير إلى Google Docs" if st.session_state.language == 'ar' else "📤 Export to Google Docs"
if st.button(export_button_text, type="primary", use_container_width=True):
export_to_google_docs_directly()
with col_logout:
# Check if user is authenticated
if google_docs_manager.is_authenticated():
logout_text = "🚪 خروج" if st.session_state.language == 'ar' else "🚪 Logout"
if st.button(logout_text, use_container_width=True, help="تسجيل الخروج من Google" if st.session_state.language == 'ar' else "Logout from Google"):
logout_from_google()
else:
# Show login status
login_status = "غير متصل" if st.session_state.language == 'ar' else "Not logged in"
st.caption(f"🔒 {login_status}")
# Processing settings
st.markdown("**" + ("إعدادات المعالجة" if st.session_state.language == 'ar' else "Processing Settings") + "**")
# Auto-process toggle (changed default to False for better UX)
st.session_state.setdefault('auto_process_snapshots', False)
auto_process = st.checkbox(
"معالجة تلقائية للمقاطع" if st.session_state.language == 'ar' else "Auto-process snapshots",
key='auto_process_snapshots',
help="عند التفعيل، يتم معالجة المقاطع تلقائياً أثناء التسجيل" if st.session_state.language == 'ar' else "When enabled, snapshots are processed automatically during recording"
)
# Background processing toggle
st.session_state.setdefault('background_processing', True)
background_mode = st.checkbox(
"معالجة في الخلفية" if st.session_state.language == 'ar' else "Background processing",
key='background_processing',
value=True,
help="يسمح بالاستمرار في استخدام التطبيق أثناء المعالجة" if st.session_state.language == 'ar' else "Allows continued use of the app during processing"
)
if wav_audio_data:
# Two possible payload shapes: raw bytes array (legacy) or interval payload dict
if isinstance(wav_audio_data, dict) and wav_audio_data.get('type') in ('interval_wav', 'no_new'):
payload = wav_audio_data
# Mark Custom interval flow active so Step 2 editor/style can be hidden
st.session_state['_custom_active'] = True
if payload['type'] == 'no_new':
st.info("No new audio chunks yet.")
elif payload['type'] == 'interval_wav':
# Extract interval audio
b = bytes(payload['bytes'])
sr = int(payload.get('sr', 16000))
start_ms = int(payload['start_ms'])
end_ms = int(payload['end_ms'])
# Dedupe/trim logic
if end_ms <= start_ms:
st.warning("The received interval is empty.")
else:
# Prevent overlap with prior segment
last_end = st.session_state.lastFetchedEnd_ms or 0
eff_start_ms = max(start_ms, last_end)
if eff_start_ms < end_ms:
# If there is overlap, trim the audio bytes accordingly (assumes WAV PCM16 mono header 44 bytes)
try:
delta_ms = eff_start_ms - start_ms
if delta_ms > 0:
if len(b) >= 44 and b[0:4] == b'RIFF' and b[8:12] == b'WAVE':
bytes_per_sample = 2 # PCM16 mono
drop_samples = int(sr * (delta_ms / 1000.0))
drop_bytes = drop_samples * bytes_per_sample
data_size = int.from_bytes(b[40:44], 'little') if len(b) >= 44 else len(b) - 44
pcm = b[44:]
if drop_bytes < len(pcm):
pcm_trim = pcm[drop_bytes:]
else:
pcm_trim = b''
new_data_size = len(pcm_trim)
# Rebuild header sizes
header = bytearray(b[:44])
# ChunkSize at offset 4 = 36 + Subchunk2Size
(36 + new_data_size).to_bytes(4, 'little')
header[4:8] = (36 + new_data_size).to_bytes(4, 'little')
# Subchunk2Size at offset 40
header[40:44] = new_data_size.to_bytes(4, 'little')
b = bytes(header) + pcm_trim
else:
# Not a recognizable WAV header; keep as-is
pass
except Exception as _:
pass
# Compute checksum
digest = hashlib.md5(b).hexdigest()
# Skip if identical checksum and same window
exists = any(s.get('checksum') == digest and s.get('start_ms') == eff_start_ms and s.get('end_ms') == end_ms for s in st.session_state.broadcast_segments)
if not exists:
# Show processing feedback during extraction
extraction_message = "Extracting text from interval..." if st.session_state.language == 'en' else "جاري استخراج النص من الفترة الزمنية..."
feedback_placeholder = show_processing_feedback(extraction_message, st.session_state.language)
try:
# Run standard pipeline to get text (no translation to keep it light)
# Reuse run_audio_processing internals via a temp path
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tf:
tf.write(b)
tmp_path = tf.name
try:
processor = AUDIO_PROCESSOR_CLASS()
word_timestamps, processor_logs, model_used = processor.get_word_timestamps(tmp_path)
full_text = " ".join([d['word'] for d in word_timestamps]) if word_timestamps else ""
# Fallback: if timestamps extraction yielded no words, try plain transcription
if not full_text:
plain_text, err, fallback_model = processor.transcribe_audio(tmp_path)
if plain_text:
full_text = plain_text.strip()
model_used = fallback_model
finally:
if os.path.exists(tmp_path): os.unlink(tmp_path)
# إزالة رسالة المعالجة
feedback_placeholder.empty()
except Exception as e:
feedback_placeholder.empty()
cleanup_processing_state()
raise e
# Append segment immediately with only the original text
seg = {
'id': digest,
'recording_id': payload.get('session_id', 'local'),
'start_ms': eff_start_ms,
'end_ms': end_ms,
'checksum': digest,
'text': full_text,
'translations': {},
'transcription_model': model_used,
}
st.session_state.broadcast_segments.append(seg)
# Sort segments by start time (oldest first for internal storage)
st.session_state.broadcast_segments.sort(key=lambda s: s.get('start_ms', 0))
# Debug log for segment addition
if st.session_state.get('debug_mode', False):
st.write(f"Debug: Added segment #{len(st.session_state.broadcast_segments)}: {eff_start_ms/1000:.1f}s-{end_ms/1000:.1f}s")
st.session_state.lastFetchedEnd_ms = end_ms
if full_text:
if digest not in st.session_state.transcript_ids:
st.session_state.transcript_ids.add(digest)
st.session_state.transcript_feed.insert(
0,
{
"id": digest,
"ts": int(time.time() * 1000),
"text": full_text,
},
)
st.session_state.edited_text = "\n\n".join(
[s["text"] for s in st.session_state.transcript_feed]
)
# Show immediate success message
success_msg = f"✅ {'تم إضافة مقطع جديد' if st.session_state.language == 'ar' else 'Added new segment'}: {eff_start_ms/1000:.2f}s → {end_ms/1000:.2f}s"
st.success(success_msg)
# Force UI refresh to show the new segment immediately in the broadcast stack
st.rerun()
# Now, asynchronously update translation and summary after segment is added
def update_translation_and_summary():
try:
if full_text and st.session_state.get('enable_translation', True):
translator = get_translator()
sel_lang = st.session_state.get('broadcast_translation_lang', 'ar')
tx, _ = translator.translate_text(full_text, target_language=sel_lang)
if tx:
seg['translations'][sel_lang] = tx
except Exception:
pass
# Update summary
if st.session_state.get('auto_generate_summary', True):
try:
source_text = " \n".join([s.get('text', '') for s in st.session_state.broadcast_segments if s.get('text')])
if source_text.strip():
summary, _ = generate_summary(source_text, target_language=st.session_state.get('summary_language', 'ar'))
if summary:
st.session_state.arabic_explanation = summary
except Exception:
pass
import threading
threading.Thread(target=update_translation_and_summary, daemon=True).start()
else:
st.info("Duplicate segment ignored.")
else:
st.info("No new parts after the last point.")
else:
# Legacy: treat as full wav bytes
bytes_data = bytes(wav_audio_data)
# This is not the Custom interval mode
st.session_state['_custom_active'] = False
st.session_state.audio_data = bytes_data
st.audio(bytes_data)
digest = hashlib.md5(bytes_data).hexdigest()
last_digest = st.session_state.get('_last_component_digest')
if st.session_state.auto_process_snapshots and digest != last_digest:
st.session_state['_last_component_digest'] = digest
task_id = run_audio_processing(bytes_data, "snapshot.wav")
if task_id:
st.success(f"🔄 {'تم إضافة المقطع للمعالجة' if st.session_state.language == 'ar' else 'Snapshot queued for processing'}")
else:
# Simple single button for processing
if st.button("📝 " + ("استخراج النص" if st.session_state.language == 'ar' else "Extract Text"), type="primary", use_container_width=True):
st.session_state['_last_component_digest'] = digest
task_id = run_audio_processing(bytes_data, "recorded_audio.wav")
if task_id:
st.success(f"✅ {'تم إضافة التسجيل للمعالجة' if st.session_state.language == 'ar' else 'Audio queued for processing'}")
# Simplified: removed external live slice server UI to avoid complexity
# Always show Broadcast view in Step 1 as well (regardless of transcription_data)
# Use a container that refreshes automatically when segments are added
# Always show Broadcast view in Step 1 as well (regardless of transcription_data)
with st.expander("📻 Broadcast (latest first)", expanded=True):
# Language selector for broadcast translations
try:
translator = get_translator()
langs = translator.get_supported_languages()
codes = list(langs.keys())
labels = ["detect language — Arabic (العربية)"] + [f"{code}{langs[code]}" for code in codes]
current = st.session_state.get('broadcast_translation_lang', 'ar')
# If not set, default to 'detect'
if current not in codes and current != 'detect':
current = 'detect'
default_index = 0 if current == 'detect' else (codes.index(current) + 1 if current in codes else 1)
sel_label = st.selectbox("Broadcast translation language", labels, index=default_index)
if sel_label.startswith("detect language"):
sel_code = 'detect'
else:
sel_code = sel_label.split(' — ')[0]
st.session_state.broadcast_translation_lang = sel_code
except Exception:
sel_code = st.session_state.get('broadcast_translation_lang', 'ar')
if st.session_state.broadcast_segments:
# Show all segments (no pagination for broadcast view)
total_segments = len(st.session_state.broadcast_segments)
# Ensure we have valid segments with required fields
valid_segments = [s for s in st.session_state.broadcast_segments if s.get('text') and s.get('start_ms') is not None]
if len(valid_segments) != total_segments:
st.warning(f"Found {total_segments - len(valid_segments)} invalid segments. Showing {len(valid_segments)} valid segments.")
total_segments = len(valid_segments)
st.session_state.broadcast_segments = valid_segments
# Show total count with better styling
st.markdown(f"""
<div style="
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 8px 16px;
border-radius: 20px;
text-align: center;
font-weight: 600;
margin-bottom: 15px;
box-shadow: 0 2px 10px rgba(102, 126, 234, 0.3);
">
📻 {'البث المباشر' if st.session_state.language == 'ar' else 'Live Broadcast'}{total_segments} {'مقطع' if st.session_state.language == 'ar' else 'segments'}
</div>
""", unsafe_allow_html=True)
# Show all segments (newest first) - ensure fresh sorting every time
sorted_segments = sorted(st.session_state.broadcast_segments, key=lambda s: s.get('start_ms', 0), reverse=True)
for idx, s in enumerate(sorted_segments, 1):
# Create unique segment ID
segment_id = s.get('id', f"seg_{s['start_ms']}_{s['end_ms']}")
# Original text with selection capability
original_text = s.get('text', '')
if original_text:
# Check if this segment is selected
is_selected = (st.session_state.selected_segment_id == segment_id)
# Create timestamp for bubble with segment number
timestamp = f"#{idx}{s['start_ms']/1000:.1f}s → {s['end_ms']/1000:.1f}s"
# Create columns for bubble and ask button
col_bubble, col_ask = st.columns([5, 1])
with col_bubble:
# Display text as chat bubble
bubble_html = create_broadcast_bubble(original_text, timestamp, is_selected)
st.markdown(bubble_html, unsafe_allow_html=True)
if is_selected:
st.success("🔍 " + ("هذا النص محدد للأسئلة" if st.session_state.language == 'ar' else "This text is selected for questions"))
with col_ask:
# Ask AI button
ask_button_text = "🤖 اسأل" if st.session_state.language == 'ar' else "🤖 Ask"
button_type = "primary" if not is_selected else "secondary"
if st.button(ask_button_text, key=f"ask_{segment_id}", type=button_type, use_container_width=True, help="اسأل الذكاء الاصطناعي عن هذا النص" if st.session_state.language == 'ar' else "Ask AI about this text"):
# Select this text and open question modal
st.session_state.selected_text = original_text
st.session_state.selected_segment_id = segment_id
st.session_state.show_question_modal = True
st.rerun()
# Show model used for transcription
model_note = s.get('transcription_model', None)
if model_note:
st.caption(f"Model used: {model_note}")
# Ensure and show translation in selected language
if s.get('text') and st.session_state.get('enable_translation', True):
if 'translations' not in s or not isinstance(s.get('translations'), dict):
s['translations'] = {}
# Detect language and translate if 'detect' is selected
if sel_code == 'detect':
# Use detected language from segment if available, else fallback to 'ar'
detected_lang = s.get('detected_language', None)
target_lang = 'ar' # Always translate to Arabic in detect mode
if target_lang not in s['translations']:
try:
tx, _ = get_translator().translate_text(s.get('text', ''), target_language=target_lang)
if tx:
s['translations'][target_lang] = tx
except Exception:
pass
if s['translations'].get(target_lang):
st.caption(f"Translation (AR):")
st.write(s['translations'][target_lang])
else:
if sel_code not in s['translations']:
try:
tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code)
if tx:
s['translations'][sel_code] = tx
except Exception:
pass
if s['translations'].get(sel_code):
st.caption(f"Translation ({sel_code.upper()}):")
st.write(s['translations'][sel_code])
st.divider()
else:
st.caption("No segments yet. Use the Custom button while recording.")
# --- Google Logout Function ---
def logout_from_google():
"""Logout from Google account"""
try:
success = google_docs_manager.logout()
if success:
st.success("تم تسجيل الخروج بنجاح!" if st.session_state.language == 'ar' else "Successfully logged out!")
st.info("يمكنك الآن تسجيل الدخول بحساب آخر" if st.session_state.language == 'ar' else "You can now login with a different account")
# Force rerun to update UI
time.sleep(1)
st.rerun()
else:
st.error("خطأ في تسجيل الخروج" if st.session_state.language == 'ar' else "Error during logout")
except Exception as e:
st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}")
# --- Direct Google Docs Export Function ---
def export_to_google_docs_directly():
"""Export broadcast segments directly to Google Docs without conditions"""
try:
# Get current segments (all segments, not filtered by timestamp)
segments = st.session_state.broadcast_segments or []
# Show progress using custom feedback
export_message = "جاري التصدير إلى Google Docs..." if st.session_state.language == 'ar' else "Exporting to Google Docs..."
feedback_placeholder = show_processing_feedback(export_message, st.session_state.language)
try:
# Export directly
doc_url, error = google_docs_manager.export_broadcast_to_docs(
segments,
ui_language=st.session_state.language
)
# إزالة رسالة المعالجة
feedback_placeholder.empty()
except Exception as e:
feedback_placeholder.empty()
cleanup_processing_state()
raise e
if doc_url and not error:
st.success("تم إنشاء المستند بنجاح!" if st.session_state.language == 'ar' else "Document created successfully!")
# Show clickable link
if st.session_state.language == 'ar':
st.markdown(f"🔗 [فتح المستند في Google Docs]({doc_url})")
st.info("💡 نصيحة: اضغط على الرابط أعلاه لفتح المستند في تبويب جديد")
else:
st.markdown(f"🔗 [Open Document in Google Docs]({doc_url})")
st.info("💡 Tip: Click the link above to open the document in a new tab")
# Also show the URL for copying
st.code(doc_url, language=None)
# Show current user info
if google_docs_manager.is_authenticated():
st.caption("✅ متصل بحساب Google" if st.session_state.language == 'ar' else "✅ Connected to Google account")
else:
error_msg = error or "Unknown error occurred"
st.error(f"خطأ في التصدير: {error_msg}" if st.session_state.language == 'ar' else f"Export error: {error_msg}")
# Show setup instructions if credentials are missing
if "credentials" in error_msg.lower() or "authentication" in error_msg.lower():
st.info("📋 يرجى مراجعة ملف GOOGLE_SETUP.md لإعداد Google Docs" if st.session_state.language == 'ar' else "📋 Please check GOOGLE_SETUP.md for Google Docs setup instructions")
except Exception as e:
st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}")
# --- AI Question Modal Function ---
def show_question_modal():
"""Display AI question modal for selected text"""
if not st.session_state.selected_text:
st.session_state.show_question_modal = False
return
# Get AI question engine
question_engine = get_ai_question_engine()
# Modal header
st.subheader("🤖 اسأل الذكاء الاصطناعي" if st.session_state.language == 'ar' else "🤖 Ask AI")
# Show selected text
with st.expander("النص المحدد" if st.session_state.language == 'ar' else "Selected Text", expanded=True):
st.write(f"📝 {st.session_state.selected_text}")
# Show conversation history if exists
if st.session_state.current_question_session:
session = question_engine.get_conversation_history(st.session_state.current_question_session)
if session and session.conversation:
with st.expander(f"💬 تاريخ المحادثة ({len(session.conversation)} أسئلة)" if st.session_state.language == 'ar' else f"💬 Conversation History ({len(session.conversation)} questions)", expanded=False):
for i, qa in enumerate(session.conversation, 1):
st.markdown(f"**{i}. {qa.question}**")
st.write(qa.answer)
# Show timing and model info
model_info = getattr(qa, 'model_used', 'Unknown')
model_color = "green" if "Gemini" in model_info else "orange" if "Groq" in model_info or "OpenRouter" in model_info else "red"
caption_text = f"⏱️ {qa.timestamp.strftime('%H:%M:%S')} - {qa.response_time_ms}ms"
model_text = f"🔧 {model_info}"
st.caption(caption_text)
st.markdown(f"<small style='color: {model_color}'>{model_text}</small>", unsafe_allow_html=True)
if i < len(session.conversation):
st.divider()
# Model selection (moved to top)
st.markdown("**" + ("اختيار النموذج" if st.session_state.language == 'ar' else "Model Selection") + "**")
# Get model availability
models_status = question_engine.check_model_availability()
# Create model options with status indicators
model_options = {}
for model_name, status_info in models_status.items():
display_name = f"{status_info['icon']} {model_name} - {status_info['message']}"
model_options[display_name] = model_name
# Add auto option
auto_text = "🔄 تلقائي (أفضل نموذج متاح)" if st.session_state.language == 'ar' else "🔄 Auto (Best available model)"
model_options = {auto_text: 'auto', **model_options}
# Find current selection index
current_model = st.session_state.get('preferred_ai_model', 'auto')
current_index = 0
for i, (display_name, model_name) in enumerate(model_options.items()):
if model_name == current_model:
current_index = i
break
# Model selector
selected_model_display = st.selectbox(
"النموذج المفضل" if st.session_state.language == 'ar' else "Preferred Model",
options=list(model_options.keys()),
index=current_index,
help="اختر النموذج المفضل للإجابة" if st.session_state.language == 'ar' else "Choose preferred model for answering"
)
selected_model = model_options[selected_model_display]
st.session_state.preferred_ai_model = selected_model
# Show model status details
if selected_model != 'auto':
status_info = models_status[selected_model]
if status_info['status'] != 'available':
if status_info['status'] == 'quota_exceeded':
st.warning("⚠️ هذا النموذج استنفد حصته اليومية" if st.session_state.language == 'ar' else "⚠️ This model has exceeded its daily quota")
elif status_info['status'] == 'not_configured':
st.info("ℹ️ هذا النموذج غير مُعد - سيتم استخدام البديل" if st.session_state.language == 'ar' else "ℹ️ This model is not configured - fallback will be used")
else:
st.error("❌ هذا النموذج غير متاح حالياً" if st.session_state.language == 'ar' else "❌ This model is currently unavailable")
# Language selection for answers
st.markdown("**" + ("لغة الإجابة" if st.session_state.language == 'ar' else "Answer Language") + "**")
# Language options
language_options = {
"🔄 تلقائي (حسب لغة الواجهة)" if st.session_state.language == 'ar' else "🔄 Auto (Interface language)": 'auto',
"🇸🇦 العربية": 'ar',
"🇺🇸 English": 'en',
"🇫🇷 Français": 'fr',
"🇪🇸 Español": 'es',
"🇩🇪 Deutsch": 'de',
"🇨🇳 中文": 'zh'
}
# Find current language selection
current_lang = st.session_state.get('preferred_answer_language', 'auto')
current_lang_index = 0
for i, (display_name, lang_code) in enumerate(language_options.items()):
if lang_code == current_lang:
current_lang_index = i
break
# Language selector
selected_language_display = st.selectbox(
"لغة الإجابة المفضلة" if st.session_state.language == 'ar' else "Preferred Answer Language",
options=list(language_options.keys()),
index=current_lang_index,
help="اختر اللغة التي تريد الحصول على الإجابة بها" if st.session_state.language == 'ar' else "Choose the language for AI responses"
)
selected_answer_language = language_options[selected_language_display]
st.session_state.preferred_answer_language = selected_answer_language
# Show language info
if selected_answer_language == 'auto':
current_ui_lang = "العربية" if st.session_state.language == 'ar' else "English"
st.caption(f"ℹ️ سيتم استخدام لغة الواجهة الحالية: {current_ui_lang}" if st.session_state.language == 'ar' else f"ℹ️ Will use current interface language: {current_ui_lang}")
else:
lang_names = {'ar': 'العربية', 'en': 'English', 'fr': 'Français', 'es': 'Español', 'de': 'Deutsch', 'zh': '中文'}
selected_lang_name = lang_names.get(selected_answer_language, selected_answer_language)
st.caption(f"ℹ️ الإجابات ستكون باللغة: {selected_lang_name}" if st.session_state.language == 'ar' else f"ℹ️ Answers will be in: {selected_lang_name}")
# Question templates
st.markdown("**" + ("قوالب الأسئلة السريعة" if st.session_state.language == 'ar' else "Quick Question Templates") + "**")
templates = question_engine.get_question_templates(st.session_state.language)
# Display templates as buttons in columns
cols = st.columns(2)
for i, template in enumerate(templates[:6]): # Show first 6 templates
with cols[i % 2]:
if st.button(template, key=f"template_{i}", use_container_width=True):
# Process template question
process_ai_question(template, is_template=True, preferred_model=selected_model, answer_language=selected_answer_language)
return
# Custom question input
st.markdown("**" + ("أو اكتب سؤالك الخاص" if st.session_state.language == 'ar' else "Or Write Your Own Question") + "**")
custom_question = st.text_area(
"سؤالك" if st.session_state.language == 'ar' else "Your Question",
placeholder="اكتب سؤالك هنا..." if st.session_state.language == 'ar' else "Type your question here...",
height=100
)
# Action buttons
col_ask, col_cancel = st.columns(2)
with col_ask:
if st.button("🚀 اسأل" if st.session_state.language == 'ar' else "🚀 Ask", type="primary", disabled=not custom_question.strip()):
if custom_question.strip():
process_ai_question(custom_question.strip(), is_template=False, preferred_model=selected_model, answer_language=selected_answer_language)
return
with col_cancel:
if st.button("❌ إلغاء" if st.session_state.language == 'ar' else "❌ Cancel"):
st.session_state.show_question_modal = False
st.session_state.selected_text = None
st.session_state.selected_segment_id = None
st.rerun()
def process_ai_question(question: str, is_template: bool = False, preferred_model: str = 'auto', answer_language: str = 'auto'):
"""Process AI question and show response"""
question_engine = get_ai_question_engine()
# Prepare segment info
segment_info = {
'id': st.session_state.selected_segment_id,
'start_ms': 0, # We'll get this from the actual segment if needed
'end_ms': 0
}
# Show processing indicator using custom feedback
processing_message = "جاري معالجة سؤالك..." if st.session_state.language == 'ar' else "Processing your question..."
feedback_placeholder = show_processing_feedback(processing_message, st.session_state.language)
try:
# Determine answer language
if answer_language == 'auto':
answer_lang = st.session_state.language
else:
answer_lang = answer_language
# Process question
result = question_engine.process_question(
selected_text=st.session_state.selected_text,
question=question,
segment_info=segment_info,
ui_language=answer_lang, # Use selected answer language
session_id=st.session_state.current_question_session,
preferred_model=preferred_model
)
# Handle different return formats for backward compatibility
if len(result) == 4:
answer, error, session_id, model_used = result
else:
answer, error, session_id = result
model_used = "Unknown"
# إزالة رسالة المعالجة
feedback_placeholder.empty()
except Exception as e:
feedback_placeholder.empty()
cleanup_processing_state()
raise e
# Update session ID
st.session_state.current_question_session = session_id
if answer:
# Check response type and model fallback
is_simple_response = "ملاحظة: هذه إجابة مبسطة" in answer or "Note: This is a simplified response" in answer
preferred_model = st.session_state.get('preferred_ai_model', 'auto')
model_fallback = preferred_model != 'auto' and preferred_model != model_used
if is_simple_response:
st.warning("⚠️ خدمة الذكاء الاصطناعي غير متاحة حالياً - إجابة مبسطة" if st.session_state.language == 'ar' else "⚠️ AI service temporarily unavailable - simplified response")
elif model_fallback:
st.info(f"ℹ️ النموذج المفضل ({preferred_model}) غير متاح - تم استخدام {model_used}" if st.session_state.language == 'ar' else f"ℹ️ Preferred model ({preferred_model}) unavailable - used {model_used}")
else:
st.success("تم الحصول على الإجابة!" if st.session_state.language == 'ar' else "Got the answer!")
# Display Q&A
st.markdown("### " + ("السؤال" if st.session_state.language == 'ar' else "Question"))
st.write(f"❓ {question}")
st.markdown("### " + ("الإجابة" if st.session_state.language == 'ar' else "Answer"))
st.write(f"🤖 {answer}")
# Show which model was used with enhanced styling
if model_used:
# Get model status for better display
question_engine = get_ai_question_engine()
models_status = question_engine.check_model_availability()
model_info = models_status.get(model_used, {})
icon = model_info.get('icon', '🤖')
color = model_info.get('color', 'gray')
# Show if user's preferred model was used or fallback occurred
preferred_model = st.session_state.get('preferred_ai_model', 'auto')
if preferred_model != 'auto' and preferred_model != model_used:
fallback_msg = " (تم التبديل للبديل)" if st.session_state.language == 'ar' else " (fallback used)"
color = "orange"
else:
fallback_msg = ""
model_display = f"{icon} {model_used}{fallback_msg}"
# Show answer language info
answer_lang = st.session_state.get('preferred_answer_language', 'auto')
if answer_lang == 'auto':
lang_display = "تلقائي" if st.session_state.language == 'ar' else "Auto"
lang_flag = "🔄"
else:
lang_flags = {'ar': '🇸🇦', 'en': '🇺🇸', 'fr': '🇫🇷', 'es': '🇪🇸', 'de': '🇩🇪', 'zh': '🇨🇳'}
lang_names = {'ar': 'العربية', 'en': 'English', 'fr': 'Français', 'es': 'Español', 'de': 'Deutsch', 'zh': '中文'}
lang_flag = lang_flags.get(answer_lang, '🌐')
lang_display = lang_names.get(answer_lang, answer_lang)
info_text = f"🔧 {'النموذج' if st.session_state.language == 'ar' else 'Model'}: {model_display} | 🌐 {'اللغة' if st.session_state.language == 'ar' else 'Language'}: {lang_flag} {lang_display}"
st.markdown(f"<div style='background-color: rgba(0,0,0,0.1); padding: 8px; border-radius: 5px; margin: 5px 0;'><small style='color: {color}'>{info_text}</small></div>", unsafe_allow_html=True)
# Show additional help for simple responses
if is_simple_response:
with st.expander("💡 نصائح للحصول على إجابات أفضل" if st.session_state.language == 'ar' else "💡 Tips for better answers"):
if st.session_state.language == 'ar':
st.markdown("""
**لماذا الإجابة مبسطة؟**
- تم استنفاد الحد اليومي لخدمة Gemini AI المجانية (50 طلب/يوم)
- النظام يستخدم إجابات مبسطة كبديل مؤقت
**للحصول على إجابات أفضل:**
- حاول مرة أخرى غداً (يتم تجديد الحد اليومي)
- اطرح أسئلة أكثر تحديداً
- ابحث في مصادر إضافية للموضوع
""")
else:
st.markdown("""
**Why is the answer simplified?**
- Daily limit for free Gemini AI service exceeded (50 requests/day)
- System is using simplified responses as temporary fallback
**For better answers:**
- Try again tomorrow (daily limit resets)
- Ask more specific questions
- Search additional sources for the topic
""")
# Action buttons for the response
col_copy, col_follow, col_close = st.columns(3)
with col_copy:
if st.button("📋 نسخ" if st.session_state.language == 'ar' else "📋 Copy"):
# Format for copying
copy_text = f"السؤال: {question}\nالإجابة: {answer}" if st.session_state.language == 'ar' else f"Question: {question}\nAnswer: {answer}"
st.code(copy_text, language=None)
st.success("تم تنسيق النص للنسخ أعلاه" if st.session_state.language == 'ar' else "Text formatted for copying above")
with col_follow:
if st.button("➕ سؤال متابعة" if st.session_state.language == 'ar' else "➕ Follow-up"):
# Keep modal open for follow-up question
st.rerun()
with col_close:
if st.button("✅ إغلاق" if st.session_state.language == 'ar' else "✅ Close"):
st.session_state.show_question_modal = False
st.session_state.selected_text = None
st.session_state.selected_segment_id = None
st.rerun()
else:
# Show error
st.error(f"خطأ: {error}" if st.session_state.language == 'ar' else f"Error: {error}")
# Retry and close buttons
col_retry, col_close = st.columns(2)
with col_retry:
if st.button("🔄 إعادة المحاولة" if st.session_state.language == 'ar' else "🔄 Retry"):
process_ai_question(question, is_template)
return
with col_close:
if st.button("❌ إغلاق" if st.session_state.language == 'ar' else "❌ Close"):
st.session_state.show_question_modal = False
st.session_state.selected_text = None
st.session_state.selected_segment_id = None
st.rerun()
# --- Export Modal Function ---
def show_export_modal():
"""Display export modal with preview and options"""
# Initialize Google Docs auth
if 'google_auth' not in st.session_state:
st.session_state.google_auth = GoogleDocsAuth()
google_auth = st.session_state.google_auth
# Filter segments from export timestamp
if not st.session_state.export_timestamp or not st.session_state.broadcast_segments:
st.session_state.show_export_modal = False
return
# Get segments after export timestamp
filtered_segments = []
for segment in st.session_state.broadcast_segments:
if segment.get('start_ms', 0) >= st.session_state.export_timestamp:
filtered_segments.append(segment)
# Sort by start time (oldest first for export)
filtered_segments.sort(key=lambda s: s.get('start_ms', 0))
if not filtered_segments:
st.warning("لا توجد مقاطع جديدة للتصدير منذ الضغط على الزر" if st.session_state.language == 'ar' else "No new segments to export since button press")
if st.button("إغلاق" if st.session_state.language == 'ar' else "Close"):
st.session_state.show_export_modal = False
st.rerun()
return
# Export preview
st.subheader("📋 معاينة التصدير" if st.session_state.language == 'ar' else "📋 Export Preview")
export_time = datetime.fromtimestamp(st.session_state.export_timestamp / 1000)
st.info(f"{'المقاطع من وقت' if st.session_state.language == 'ar' else 'Segments from'}: {export_time.strftime('%H:%M:%S')}")
st.info(f"{'عدد المقاطع' if st.session_state.language == 'ar' else 'Number of segments'}: {len(filtered_segments)}")
# Show preview of segments
with st.expander("معاينة المحتوى" if st.session_state.language == 'ar' else "Content Preview", expanded=False):
for i, segment in enumerate(filtered_segments[:3]): # Show first 3 segments
start_time = segment.get('start_ms', 0) / 1000
end_time = segment.get('end_ms', 0) / 1000
st.markdown(f"**[{start_time:.2f}s → {end_time:.2f}s]**")
st.write(segment.get('text', '')[:100] + "..." if len(segment.get('text', '')) > 100 else segment.get('text', ''))
if i < 2 and i < len(filtered_segments) - 1:
st.divider()
if len(filtered_segments) > 3:
st.caption(f"... {'و' if st.session_state.language == 'ar' else 'and'} {len(filtered_segments) - 3} {'مقاطع أخرى' if st.session_state.language == 'ar' else 'more segments'}")
# Export options
col1, col2 = st.columns(2)
with col1:
format_options = {
"📄 Word Document": "word",
"📝 Google Docs": "google_docs"
}
selected_format = st.selectbox(
"تنسيق التصدير" if st.session_state.language == 'ar' else "Export Format",
options=list(format_options.keys()),
index=0
)
st.session_state.export_format = format_options[selected_format]
with col2:
include_summary = st.checkbox(
"تضمين الملخص" if st.session_state.language == 'ar' else "Include Summary",
value=True
)
# Google Docs authentication section
if st.session_state.export_format == 'google_docs':
st.markdown("---")
if google_auth.is_authenticated():
st.success("✅ " + ("متصل بـ Google Docs" if st.session_state.language == 'ar' else "Connected to Google Docs"))
col_logout, col_info = st.columns([1, 2])
with col_logout:
if st.button("🚪 " + ("تسجيل خروج" if st.session_state.language == 'ar' else "Logout")):
google_auth.logout()
st.rerun()
with col_info:
st.caption("سيتم إنشاء المستند في حسابك على Google" if st.session_state.language == 'ar' else "Document will be created in your Google account")
else:
st.warning("🔐 " + ("يجب تسجيل الدخول إلى Google Docs أولاً" if st.session_state.language == 'ar' else "Please authenticate with Google Docs first"))
# Handle OAuth callback
auth_code = st.query_params.get("code")
if auth_code:
success, message = google_auth.handle_auth_callback(auth_code)
if success:
st.success(message)
# Clear the code from URL
st.query_params.clear()
st.rerun()
else:
st.error(message)
# Show authentication button
auth_url, error = google_auth.get_auth_url()
if auth_url:
st.markdown(f"""
<a href="{auth_url}" target="_blank">
<button style="background-color: #4285f4; color: white; padding: 10px 20px; border: none; border-radius: 5px; cursor: pointer;">
🔗 {"تسجيل الدخول إلى Google" if st.session_state.language == 'ar' else "Sign in with Google"}
</button>
</a>
""", unsafe_allow_html=True)
st.caption("سيتم فتح نافذة جديدة للمصادقة" if st.session_state.language == 'ar' else "A new window will open for authentication")
else:
st.error(f"خطأ في إعداد Google API: {error}" if st.session_state.language == 'ar' else f"Google API setup error: {error}")
st.info("يرجى إعداد GOOGLE_CLIENT_ID و GOOGLE_CLIENT_SECRET في متغيرات البيئة" if st.session_state.language == 'ar' else "Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET environment variables")
# Export buttons
col_export, col_cancel = st.columns(2)
with col_export:
# Disable export button if Google Docs is selected but not authenticated
export_disabled = (st.session_state.export_format == 'google_docs' and not google_auth.is_authenticated())
export_button_text = "🚀 تصدير" if st.session_state.language == 'ar' else "🚀 Export"
if st.button(export_button_text, type="primary", disabled=export_disabled):
perform_export(filtered_segments, include_summary, google_auth)
with col_cancel:
if st.button("❌ إلغاء" if st.session_state.language == 'ar' else "❌ Cancel"):
st.session_state.show_export_modal = False
st.rerun()
# --- Export Execution Function ---
def perform_export(segments, include_summary=True, google_auth=None):
"""Perform the actual export operation"""
try:
# Initialize exporter
translator = get_translator()
exporter = BroadcastExporter(translator)
# Create export configuration
config = ExportConfig(
export_timestamp=st.session_state.export_timestamp,
format_type=st.session_state.export_format,
include_summary=include_summary,
ui_language=st.session_state.language,
target_language=st.session_state.get('broadcast_translation_lang', 'ar')
)
# Prepare export content
content = exporter.prepare_export_content(segments, config)
# Show progress using custom feedback
export_message = "جاري التصدير..." if st.session_state.language == 'ar' else "Exporting..."
feedback_placeholder = show_processing_feedback(export_message, st.session_state.language)
try:
# Perform export with fallback
result, error = exporter.export_with_fallback(content, config, google_auth)
# إزالة رسالة المعالجة
feedback_placeholder.empty()
except Exception as e:
feedback_placeholder.empty()
cleanup_processing_state()
raise e
if result and not error:
if config.format_type == 'word':
# Provide download link for Word document
with open(result, 'rb') as file:
st.download_button(
label="📥 تحميل الملف" if st.session_state.language == 'ar' else "📥 Download File",
data=file.read(),
file_name=os.path.basename(result),
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
st.success("تم إنشاء الملف بنجاح!" if st.session_state.language == 'ar' else "File created successfully!")
else:
# Google Docs URL
st.success("تم إنشاء المستند بنجاح!" if st.session_state.language == 'ar' else "Document created successfully!")
st.markdown(f"[فتح في Google Docs]({result})" if st.session_state.language == 'ar' else f"[Open in Google Docs]({result})")
else:
st.error(f"خطأ في التصدير: {error}" if st.session_state.language == 'ar' else f"Export error: {error}")
except Exception as e:
st.error(f"خطأ غير متوقع: {str(e)}" if st.session_state.language == 'ar' else f"Unexpected error: {str(e)}")
# Close modal after export attempt
if st.button("إغلاق" if st.session_state.language == 'ar' else "Close"):
st.session_state.show_export_modal = False
st.rerun()
# Note: external live slice helper removed to keep the app simple and fully local
# --- Step 2: Review and Customize (REMOVED) ---
# This section was removed as requested by user to simplify the interface
# Results are now shown directly in show_processing_results() function
def reset_session():
"""Resets the session state by clearing specific keys and re-initializing."""
log_to_browser_console("--- INFO: Resetting session state. ---")
keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording']
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
initialize_session_state()
# --- Entry Point ---
if __name__ == "__main__":
if check_api_key():
initialize_session_state()
main()