Spaces:
Sleeping
Sleeping
| # app.py - Refactored to eliminate recorder_server.py dependency | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| import time | |
| import traceback | |
| import streamlit.components.v1 as components | |
| import hashlib | |
| # from st_audiorec import st_audiorec # Import the new recorder component - OLD | |
| # Reduce metrics/usage writes that can cause permission errors on hosted environments | |
| try: | |
| st.set_option('browser.gatherUsageStats', False) | |
| except Exception: | |
| pass | |
| # Robust component declaration: prefer local build, else fall back to pip package | |
| parent_dir = os.path.dirname(os.path.abspath(__file__)) | |
| build_dir = os.path.join(parent_dir, "custom_components/st-audiorec/st_audiorec/frontend/build") | |
| def st_audiorec(key=None): | |
| """Return audio recorder component value, trying local build first, then pip package fallback.""" | |
| try: | |
| if os.path.isdir(build_dir): | |
| _component_func = components.declare_component("st_audiorec", path=build_dir) | |
| return _component_func(key=key, default=0) | |
| # Fallback to pip-installed component if available | |
| try: | |
| from st_audiorec import st_audiorec as st_audiorec_pkg | |
| return st_audiorec_pkg(key=key) | |
| except Exception: | |
| st.warning("Audio recorder component is unavailable on this deployment (missing local build and pip fallback).") | |
| return None | |
| except Exception: | |
| # Final safety net | |
| st.warning("Failed to initialize audio recorder component.") | |
| return None | |
| # --- Critical Imports and Initial Checks --- | |
| AUDIO_PROCESSOR_CLASS = None | |
| IMPORT_ERROR_TRACEBACK = None | |
| try: | |
| from audio_processor import AudioProcessor | |
| AUDIO_PROCESSOR_CLASS = AudioProcessor | |
| except Exception: | |
| IMPORT_ERROR_TRACEBACK = traceback.format_exc() | |
| from video_generator import VideoGenerator | |
| from mp3_embedder import MP3Embedder | |
| from utils import format_timestamp | |
| from translator import get_translator, UI_TRANSLATIONS | |
| import requests | |
| from dotenv import load_dotenv | |
| # --- API Key Check --- | |
| def check_api_key(): | |
| """Check for Gemini API key and display instructions if not found.""" | |
| load_dotenv() | |
| if not os.getenv("GEMINI_API_KEY"): | |
| st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!") | |
| st.info("To fix this, please follow these steps:") | |
| st.markdown(""" | |
| 1. **Find the file named `.env.example`** in the `syncmaster2` directory. | |
| 2. **Rename it to `.env`**. | |
| 3. **Open the `.env` file** with a text editor. | |
| 4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey). | |
| 5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`. | |
| 6. **Save the file and restart the application.** | |
| """) | |
| return False | |
| return True | |
| # --- Summary Helper (robust to cached translator without summarize_text) --- | |
| def generate_summary(text: str, target_language: str = 'ar'): | |
| """Generate a concise summary in target_language, with graceful fallback. | |
| If summarize_text is unavailable (cached instance), fall back to Arabic summary | |
| then translate to the target language if needed. | |
| """ | |
| tr = get_translator() | |
| try: | |
| if hasattr(tr, 'summarize_text') and callable(getattr(tr, 'summarize_text')): | |
| s, err = tr.summarize_text(text or '', target_language=target_language) | |
| if s: | |
| return s, None | |
| # Fallback path: Arabic summary first | |
| s_ar, err_ar = tr.summarize_text_arabic(text or '') | |
| if target_language and target_language != 'ar' and s_ar: | |
| tx, err_tx = tr.translate_text(s_ar, target_language=target_language) | |
| if tx: | |
| return tx, None | |
| return s_ar, err_tx | |
| return s_ar, err_ar | |
| except Exception as e: | |
| return None, str(e) | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="SyncMaster - AI Audio-Text Synchronization", | |
| page_icon="🎵", | |
| layout="wide" | |
| ) | |
| # --- Browser Console Logging Utility --- | |
| def log_to_browser_console(messages): | |
| """Injects JavaScript to log messages to the browser's console.""" | |
| if isinstance(messages, str): | |
| messages = [messages] | |
| escaped_messages = [json.dumps(str(msg)) for msg in messages] | |
| js_code = f""" | |
| <script> | |
| (function() {{ | |
| const logs = [{', '.join(escaped_messages)}]; | |
| console.group("Backend Logs from SyncMaster"); | |
| logs.forEach(log => {{ | |
| const content = String(log); | |
| if (content.includes('--- ERROR') || content.includes('--- FATAL')) {{ | |
| console.error(log); | |
| }} else if (content.includes('--- WARNING')) {{ | |
| console.warn(log); | |
| }} else if (content.includes('--- DEBUG')) {{ | |
| console.debug(log); | |
| }} else {{ | |
| console.log(log); | |
| }} | |
| }}); | |
| console.groupEnd(); | |
| }})(); | |
| </script> | |
| """ | |
| components.html(js_code, height=0, scrolling=False) | |
| # --- Session State Initialization --- | |
| def initialize_session_state(): | |
| """Initializes the session state variables if they don't exist.""" | |
| if 'step' not in st.session_state: | |
| st.session_state.step = 1 | |
| if 'audio_data' not in st.session_state: | |
| st.session_state.audio_data = None | |
| if 'language' not in st.session_state: | |
| st.session_state.language = 'en' | |
| if 'enable_translation' not in st.session_state: | |
| st.session_state.enable_translation = True | |
| if 'target_language' not in st.session_state: | |
| st.session_state.target_language = 'ar' | |
| if 'transcription_data' not in st.session_state: | |
| st.session_state.transcription_data = None | |
| if 'edited_text' not in st.session_state: | |
| st.session_state.edited_text = "" | |
| if 'video_style' not in st.session_state: | |
| st.session_state.video_style = { | |
| 'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF', | |
| 'highlight_color': '#FFD700', 'background_color': '#000000', | |
| 'font_family': 'Arial', 'font_size': 48 | |
| } | |
| if 'new_recording' not in st.session_state: | |
| st.session_state.new_recording = None | |
| # Transcript feed (prepend latest) and dedupe set | |
| if 'transcript_feed' not in st.session_state: | |
| st.session_state.transcript_feed = [] # list of {id, ts, text} | |
| if 'transcript_ids' not in st.session_state: | |
| st.session_state.transcript_ids = set() | |
| # Incremental broadcast state | |
| if 'broadcast_segments' not in st.session_state: | |
| st.session_state.broadcast_segments = [] # [{id, recording_id, start_ms, end_ms, checksum, text}] | |
| if 'lastFetchedEnd_ms' not in st.session_state: | |
| st.session_state.lastFetchedEnd_ms = 0 | |
| # Broadcast translation language (separate from general UI translation target) | |
| if 'broadcast_translation_lang' not in st.session_state: | |
| # Default broadcast translation target to Arabic | |
| st.session_state.broadcast_translation_lang = 'ar' | |
| if 'summary_language' not in st.session_state: | |
| # Default summary language to Arabic | |
| st.session_state.summary_language = 'ar' | |
| # Auto-generate Arabic summary toggle | |
| if 'auto_generate_summary' not in st.session_state: | |
| st.session_state.auto_generate_summary = True | |
| # --- Centralized Audio Processing Function --- | |
| def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): | |
| """ | |
| A single, robust function to handle all audio processing. | |
| Takes audio bytes as input and returns the processed data. | |
| """ | |
| # This function is the classic, non-Custom path; ensure editor sections are enabled | |
| st.session_state['_custom_active'] = False | |
| if not audio_bytes: | |
| st.error("No audio data provided to process.") | |
| return | |
| tmp_file_path = None | |
| log_to_browser_console("--- INFO: Starting unified audio processing. ---") | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file: | |
| tmp_file.write(audio_bytes) | |
| tmp_file_path = tmp_file.name | |
| processor = AUDIO_PROCESSOR_CLASS() | |
| result_data = None | |
| full_text = "" | |
| word_timestamps = [] | |
| # Determine which processing path to take | |
| if st.session_state.enable_translation: | |
| with st.spinner("⏳ Performing AI Transcription & Translation... please wait."): | |
| result_data, processor_logs = processor.get_word_timestamps_with_translation( | |
| tmp_file_path, | |
| st.session_state.target_language, | |
| ) | |
| log_to_browser_console(processor_logs) | |
| if not result_data or not result_data.get("original_text"): | |
| st.warning( | |
| "Could not generate transcription with translation. Check browser console (F12) for logs." | |
| ) | |
| return | |
| st.session_state.transcription_data = { | |
| "text": result_data["original_text"], | |
| "translated_text": result_data["translated_text"], | |
| "word_timestamps": result_data["word_timestamps"], | |
| "audio_bytes": audio_bytes, | |
| "original_suffix": Path(original_filename).suffix, | |
| "translation_success": result_data.get("translation_success", False), | |
| "detected_language": result_data.get("language_detected", "unknown"), | |
| } | |
| # Update transcript feed (prepend, dedupe by digest) | |
| try: | |
| digest = hashlib.md5(audio_bytes).hexdigest() | |
| except Exception: | |
| digest = f"snap-{int(time.time()*1000)}" | |
| if digest not in st.session_state.transcript_ids: | |
| st.session_state.transcript_ids.add(digest) | |
| st.session_state.transcript_feed.insert( | |
| 0, | |
| { | |
| "id": digest, | |
| "ts": int(time.time() * 1000), | |
| "text": result_data["original_text"], | |
| }, | |
| ) | |
| # Rebuild edited_text with newest first | |
| st.session_state.edited_text = "\n\n".join( | |
| [s["text"] for s in st.session_state.transcript_feed] | |
| ) | |
| else: # Standard processing without translation | |
| with st.spinner("⏳ Performing AI Transcription... please wait."): | |
| word_timestamps, processor_logs = processor.get_word_timestamps( | |
| tmp_file_path | |
| ) | |
| log_to_browser_console(processor_logs) | |
| if not word_timestamps: | |
| st.warning( | |
| "Could not generate timestamps. Check browser console (F12) for logs." | |
| ) | |
| return | |
| full_text = " ".join([d["word"] for d in word_timestamps]) | |
| st.session_state.transcription_data = { | |
| "text": full_text, | |
| "word_timestamps": word_timestamps, | |
| "audio_bytes": audio_bytes, | |
| "original_suffix": Path(original_filename).suffix, | |
| "translation_success": False, | |
| } | |
| # Update transcript feed (prepend, dedupe by digest) | |
| try: | |
| digest = hashlib.md5(audio_bytes).hexdigest() | |
| except Exception: | |
| digest = f"snap-{int(time.time()*1000)}" | |
| if digest not in st.session_state.transcript_ids: | |
| st.session_state.transcript_ids.add(digest) | |
| st.session_state.transcript_feed.insert( | |
| 0, {"id": digest, "ts": int(time.time() * 1000), "text": full_text} | |
| ) | |
| # Rebuild edited_text with newest first | |
| st.session_state.edited_text = "\n\n".join( | |
| [s["text"] for s in st.session_state.transcript_feed] | |
| ) | |
| st.session_state.step = 1 # Keep it on the same step | |
| st.success("🎉 AI processing complete! Results are shown below.") | |
| except Exception as e: | |
| st.error("An unexpected error occurred during audio processing!") | |
| st.exception(e) | |
| log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---") | |
| finally: | |
| if tmp_file_path and os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| time.sleep(1) | |
| st.rerun() | |
| # --- Main Application Logic --- | |
| def main(): | |
| initialize_session_state() | |
| st.markdown(""" | |
| <style> | |
| .main .block-container { animation: fadeIn 0.2s ease-in-out; } | |
| @keyframes fadeIn { from { opacity: 0; } to { opacity: 1; } } | |
| .block-container { padding-top: 1rem; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| with st.sidebar: | |
| st.markdown("## 🌐 Language Settings") | |
| language_options = {'English': 'en', 'العربية': 'ar'} | |
| selected_lang_display = st.selectbox( | |
| "Interface Language", | |
| options=list(language_options.keys()), | |
| index=0 if st.session_state.language == 'en' else 1 | |
| ) | |
| st.session_state.language = language_options[selected_lang_display] | |
| st.markdown("## 🔤 Translation Settings") | |
| st.session_state.enable_translation = st.checkbox( | |
| "Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي", | |
| value=st.session_state.enable_translation, | |
| help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً" | |
| ) | |
| if st.session_state.enable_translation: | |
| target_lang_options = { | |
| 'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es' | |
| } | |
| selected_target = st.selectbox( | |
| "Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة", | |
| options=list(target_lang_options.keys()), index=0 | |
| ) | |
| st.session_state.target_language = target_lang_options[selected_target] | |
| # Auto summary toggle | |
| st.session_state.auto_generate_summary = st.checkbox( | |
| "Auto-generate Arabic summary" if st.session_state.language == 'en' else "توليد الملخص العربي تلقائياً", | |
| value=st.session_state.auto_generate_summary | |
| ) | |
| st.title("🎵 SyncMaster") | |
| if st.session_state.language == 'ar': | |
| st.markdown("### منصة المزامنة الذكية بين الصوت والنص") | |
| else: | |
| st.markdown("### The Intelligent Audio-Text Synchronization Platform") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown(f"**{'✅' if st.session_state.step >= 1 else '1️⃣'} Step 1: Upload & Process**") | |
| with col2: | |
| st.markdown(f"**{'✅' if st.session_state.step >= 2 else '2️⃣'} Step 2: Review & Customize**") | |
| st.divider() | |
| # Global settings for long recording retention and custom snapshot duration | |
| with st.expander("⚙️ Recording Settings (Snapshots)", expanded=False): | |
| st.session_state.setdefault('retention_minutes', 30) | |
| # 0 means: use full buffer by default for Custom | |
| st.session_state.setdefault('custom_snapshot_seconds', 0) | |
| # Auto-Custom interval seconds (for frontend auto trigger) | |
| st.session_state.setdefault('auto_custom_interval_sec', 10) | |
| # Auto-start incremental snapshots when recording begins | |
| st.session_state.setdefault('auto_start_custom', False) | |
| st.session_state.retention_minutes = st.number_input("Retention window (minutes)", min_value=5, max_value=240, value=st.session_state.retention_minutes) | |
| st.session_state.custom_snapshot_seconds = st.number_input("Custom snapshot (seconds; 0 = full buffer)", min_value=0, max_value=3600, value=st.session_state.custom_snapshot_seconds) | |
| st.session_state.auto_custom_interval_sec = st.number_input("Auto Custom interval (seconds)", min_value=1, max_value=3600, value=st.session_state.auto_custom_interval_sec, help="How often to auto-trigger the same Custom action while recording.") | |
| st.session_state.auto_start_custom = st.checkbox("Auto-start incremental snapshots on record", value=st.session_state.auto_start_custom, help="Start sending Custom intervals automatically as soon as you start recording.") | |
| # Inject globals into the page for the component to pick up | |
| components.html(f""" | |
| <script> | |
| window.ST_AREC_RETENTION_MINUTES = {int(st.session_state.retention_minutes)}; | |
| window.ST_AREC_CUSTOM_SNAPSHOT_SECONDS = {int(st.session_state.custom_snapshot_seconds)}; | |
| window.ST_AREC_LAST_FETCHED_END_MS = {int(st.session_state.get('lastFetchedEnd_ms', 0))}; | |
| window.ST_AREC_CUSTOM_AUTO_INTERVAL_SECONDS = {int(st.session_state.get('auto_custom_interval_sec', 10))}; | |
| window.ST_AREC_AUTO_START = {str(bool(st.session_state.get('auto_start_custom', True))).lower()}; | |
| console.log('Recorder config', window.ST_AREC_RETENTION_MINUTES, window.ST_AREC_CUSTOM_SNAPSHOT_SECONDS); | |
| </script> | |
| """, height=0) | |
| if AUDIO_PROCESSOR_CLASS is None: | |
| st.error("Fatal Error: The application could not start correctly.") | |
| st.subheader("An error occurred while trying to import `AudioProcessor`:") | |
| st.code(IMPORT_ERROR_TRACEBACK, language="python") | |
| st.stop() | |
| step_1_upload_and_process() | |
| # Always show results if they exist, regardless of step | |
| if st.session_state.transcription_data: | |
| step_2_review_and_customize() | |
| # --- Step 1: Upload and Process --- | |
| def step_1_upload_and_process(): | |
| st.header("Step 1: Choose Your Audio Source") | |
| upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"]) | |
| with upload_tab: | |
| st.subheader("Upload an existing audio file") | |
| uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A") | |
| if uploaded_file: | |
| st.session_state.audio_data = uploaded_file.getvalue() | |
| st.success(f"File ready for processing: {uploaded_file.name}") | |
| st.audio(st.session_state.audio_data) | |
| if st.button("🚀 Start AI Processing", type="primary", use_container_width=True): | |
| run_audio_processing(st.session_state.audio_data, uploaded_file.name) | |
| if st.session_state.audio_data: | |
| if st.button("🔄 Use a Different File"): | |
| reset_session() | |
| st.rerun() | |
| with record_tab: | |
| st.subheader("Record audio directly from your microphone") | |
| st.info("Click the microphone icon to start recording. Use the ⏪ buttons to snapshot the last seconds without stopping. Processing can run automatically.") | |
| # Use the audio recorder component | |
| wav_audio_data = st_audiorec() | |
| # Auto-process incoming snapshots using the existing flow (no external server) | |
| st.session_state.setdefault('auto_process_snapshots', True) | |
| st.checkbox("Auto-process snapshots (keeps recording)", key='auto_process_snapshots', help="When enabled, any snapshot from the recorder is processed immediately using the classic transcription method.") | |
| if wav_audio_data: | |
| # Two possible payload shapes: raw bytes array (legacy) or interval payload dict | |
| if isinstance(wav_audio_data, dict) and wav_audio_data.get('type') in ('interval_wav', 'no_new'): | |
| payload = wav_audio_data | |
| # Mark Custom interval flow active so Step 2 editor/style can be hidden | |
| st.session_state['_custom_active'] = True | |
| if payload['type'] == 'no_new': | |
| st.info("No new audio chunks yet.") | |
| elif payload['type'] == 'interval_wav': | |
| # Extract interval audio | |
| b = bytes(payload['bytes']) | |
| sr = int(payload.get('sr', 16000)) | |
| start_ms = int(payload['start_ms']) | |
| end_ms = int(payload['end_ms']) | |
| # Dedupe/trim logic | |
| if end_ms <= start_ms: | |
| st.warning("The received interval is empty.") | |
| else: | |
| # Prevent overlap with prior segment | |
| last_end = st.session_state.lastFetchedEnd_ms or 0 | |
| eff_start_ms = max(start_ms, last_end) | |
| if eff_start_ms < end_ms: | |
| # If there is overlap, trim the audio bytes accordingly (assumes WAV PCM16 mono header 44 bytes) | |
| try: | |
| delta_ms = eff_start_ms - start_ms | |
| if delta_ms > 0: | |
| if len(b) >= 44 and b[0:4] == b'RIFF' and b[8:12] == b'WAVE': | |
| bytes_per_sample = 2 # PCM16 mono | |
| drop_samples = int(sr * (delta_ms / 1000.0)) | |
| drop_bytes = drop_samples * bytes_per_sample | |
| data_size = int.from_bytes(b[40:44], 'little') if len(b) >= 44 else len(b) - 44 | |
| pcm = b[44:] | |
| if drop_bytes < len(pcm): | |
| pcm_trim = pcm[drop_bytes:] | |
| else: | |
| pcm_trim = b'' | |
| new_data_size = len(pcm_trim) | |
| # Rebuild header sizes | |
| header = bytearray(b[:44]) | |
| # ChunkSize at offset 4 = 36 + Subchunk2Size | |
| (36 + new_data_size).to_bytes(4, 'little') | |
| header[4:8] = (36 + new_data_size).to_bytes(4, 'little') | |
| # Subchunk2Size at offset 40 | |
| header[40:44] = new_data_size.to_bytes(4, 'little') | |
| b = bytes(header) + pcm_trim | |
| else: | |
| # Not a recognizable WAV header; keep as-is | |
| pass | |
| except Exception as _: | |
| pass | |
| # Compute checksum | |
| digest = hashlib.md5(b).hexdigest() | |
| # Skip if identical checksum and same window | |
| exists = any(s.get('checksum') == digest and s.get('start_ms') == eff_start_ms and s.get('end_ms') == end_ms for s in st.session_state.broadcast_segments) | |
| if not exists: | |
| # Show spinner during extraction so the user sees a waiting icon until text appears | |
| with st.spinner("⏳ Extracting text from interval..."): | |
| # Run standard pipeline to get text (no translation to keep it light) | |
| # Reuse run_audio_processing internals via a temp path | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tf: | |
| tf.write(b) | |
| tmp_path = tf.name | |
| try: | |
| processor = AUDIO_PROCESSOR_CLASS() | |
| word_timestamps, processor_logs, model_used = processor.get_word_timestamps(tmp_path) | |
| full_text = " ".join([d['word'] for d in word_timestamps]) if word_timestamps else "" | |
| # Fallback: if timestamps extraction yielded no words, try plain transcription | |
| if not full_text: | |
| plain_text, err, fallback_model = processor.transcribe_audio(tmp_path) | |
| if plain_text: | |
| full_text = plain_text.strip() | |
| model_used = fallback_model | |
| finally: | |
| if os.path.exists(tmp_path): os.unlink(tmp_path) | |
| # Append segment immediately with only the original text | |
| seg = { | |
| 'id': digest, | |
| 'recording_id': payload.get('session_id', 'local'), | |
| 'start_ms': eff_start_ms, | |
| 'end_ms': end_ms, | |
| 'checksum': digest, | |
| 'text': full_text, | |
| 'translations': {}, | |
| 'transcription_model': model_used, | |
| } | |
| st.session_state.broadcast_segments.append(seg) | |
| st.session_state.broadcast_segments.sort(key=lambda s: s['start_ms']) | |
| st.session_state.lastFetchedEnd_ms = end_ms | |
| if full_text: | |
| if digest not in st.session_state.transcript_ids: | |
| st.session_state.transcript_ids.add(digest) | |
| st.session_state.transcript_feed.insert( | |
| 0, | |
| { | |
| "id": digest, | |
| "ts": int(time.time() * 1000), | |
| "text": full_text, | |
| }, | |
| ) | |
| st.session_state.edited_text = "\n\n".join( | |
| [s["text"] for s in st.session_state.transcript_feed] | |
| ) | |
| st.success(f"Added new segment: {eff_start_ms/1000:.2f}s → {end_ms/1000:.2f}s") | |
| # Now, asynchronously update translation and summary after segment is added | |
| def update_translation_and_summary(): | |
| try: | |
| if full_text and st.session_state.get('enable_translation', True): | |
| translator = get_translator() | |
| sel_lang = st.session_state.get('broadcast_translation_lang', 'ar') | |
| tx, _ = translator.translate_text(full_text, target_language=sel_lang) | |
| if tx: | |
| seg['translations'][sel_lang] = tx | |
| except Exception: | |
| pass | |
| # Update summary | |
| if st.session_state.get('auto_generate_summary', True): | |
| try: | |
| source_text = " \n".join([s.get('text', '') for s in st.session_state.broadcast_segments if s.get('text')]) | |
| if source_text.strip(): | |
| summary, _ = generate_summary(source_text, target_language=st.session_state.get('summary_language', 'ar')) | |
| if summary: | |
| st.session_state.arabic_explanation = summary | |
| except Exception: | |
| pass | |
| import threading | |
| threading.Thread(target=update_translation_and_summary, daemon=True).start() | |
| else: | |
| st.info("Duplicate segment ignored.") | |
| else: | |
| st.info("No new parts after the last point.") | |
| else: | |
| # Legacy: treat as full wav bytes | |
| bytes_data = bytes(wav_audio_data) | |
| # This is not the Custom interval mode | |
| st.session_state['_custom_active'] = False | |
| st.session_state.audio_data = bytes_data | |
| st.audio(bytes_data) | |
| digest = hashlib.md5(bytes_data).hexdigest() | |
| last_digest = st.session_state.get('_last_component_digest') | |
| if st.session_state.auto_process_snapshots and digest != last_digest: | |
| st.session_state['_last_component_digest'] = digest | |
| run_audio_processing(bytes_data, "snapshot.wav") | |
| else: | |
| if st.button("📝 Extract Text", type="primary", use_container_width=True): | |
| st.session_state['_last_component_digest'] = digest | |
| run_audio_processing(bytes_data, "recorded_audio.wav") | |
| # Simplified: removed external live slice server UI to avoid complexity | |
| # Always show Broadcast view in Step 1 as well (regardless of transcription_data) | |
| with st.expander("📻 Broadcast (latest first)", expanded=True): | |
| # Language selector for broadcast translations | |
| try: | |
| translator = get_translator() | |
| langs = translator.get_supported_languages() | |
| codes = list(langs.keys()) | |
| labels = ["detect language — Arabic (العربية)"] + [f"{code} — {langs[code]}" for code in codes] | |
| current = st.session_state.get('broadcast_translation_lang', 'ar') | |
| # If not set, default to 'detect' | |
| if current not in codes and current != 'detect': | |
| current = 'detect' | |
| default_index = 0 if current == 'detect' else (codes.index(current) + 1 if current in codes else 1) | |
| sel_label = st.selectbox("Broadcast translation language", labels, index=default_index) | |
| if sel_label.startswith("detect language"): | |
| sel_code = 'detect' | |
| else: | |
| sel_code = sel_label.split(' — ')[0] | |
| st.session_state.broadcast_translation_lang = sel_code | |
| except Exception: | |
| sel_code = st.session_state.get('broadcast_translation_lang', 'ar') | |
| if st.session_state.broadcast_segments: | |
| for s in sorted(st.session_state.broadcast_segments, key=lambda s: s['start_ms'], reverse=True): | |
| st.markdown(f"**[{s['start_ms']/1000:.2f}s → {s['end_ms']/1000:.2f}s]**") | |
| st.write(s.get('text', '')) | |
| # Show model used for transcription | |
| model_note = s.get('transcription_model', None) | |
| if model_note: | |
| st.caption(f"Model used: {model_note}") | |
| # Ensure and show translation in selected language | |
| if s.get('text') and st.session_state.get('enable_translation', True): | |
| if 'translations' not in s or not isinstance(s.get('translations'), dict): | |
| s['translations'] = {} | |
| # Detect language and translate if 'detect' is selected | |
| if sel_code == 'detect': | |
| # Use detected language from segment if available, else fallback to 'ar' | |
| detected_lang = s.get('detected_language', None) | |
| target_lang = 'ar' # Always translate to Arabic in detect mode | |
| if target_lang not in s['translations']: | |
| try: | |
| tx, _ = get_translator().translate_text(s.get('text', ''), target_language=target_lang) | |
| if tx: | |
| s['translations'][target_lang] = tx | |
| except Exception: | |
| pass | |
| if s['translations'].get(target_lang): | |
| st.caption(f"Translation (AR):") | |
| st.write(s['translations'][target_lang]) | |
| else: | |
| if sel_code not in s['translations']: | |
| try: | |
| tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code) | |
| if tx: | |
| s['translations'][sel_code] = tx | |
| except Exception: | |
| pass | |
| if s['translations'].get(sel_code): | |
| st.caption(f"Translation ({sel_code.upper()}):") | |
| st.write(s['translations'][sel_code]) | |
| st.divider() | |
| else: | |
| st.caption("No segments yet. Use the Custom button while recording.") | |
| # Note: external live slice helper removed to keep the app simple and fully local | |
| # --- Step 2: Review and Customize --- | |
| def step_2_review_and_customize(): | |
| st.header("✅ Extracted Text & Translation") | |
| # Display translation results if available | |
| if st.session_state.transcription_data.get('translation_success', False): | |
| st.success(f"🌐 Translation completed! Detected language: {st.session_state.transcription_data.get('detected_language', 'N/A')}") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Original Text") | |
| # Always show the raw extracted text (not translated) | |
| st.text_area("Original Transcription", value=st.session_state.transcription_data.get('text', ''), height=150, key="original_text_area") | |
| st.button("📋 Copy Original Text", on_click=lambda: st.toast("Copied to clipboard!"), args=(), kwargs={'clipboard': st.session_state.transcription_data.get('text', '')}) | |
| with col2: | |
| st.subheader(f"Translation ({st.session_state.target_language.upper()})") | |
| st.text_area("Translated Text", value=st.session_state.transcription_data.get('translated_text', ''), height=150, key="translated_text_area") | |
| st.button("📋 Copy Translated Text", on_click=lambda: st.toast("Copied to clipboard!"), args=(), kwargs={'clipboard': st.session_state.transcription_data.get('translated_text', '')}) | |
| # Editor and style panels removed per request | |
| # Remove navigation buttons | |
| st.divider() | |
| st.subheader("🧠 Summary") | |
| st.info("A concise summary tied to the extracted broadcast text with key points and relevant examples.") | |
| # Summary language selector (default Arabic) | |
| try: | |
| translator = get_translator() | |
| langs = translator.get_supported_languages() | |
| codes = list(langs.keys()) | |
| labels = [f"{code} — {langs[code]}" for code in codes] | |
| cur = st.session_state.get('summary_language', 'ar') | |
| idx = codes.index(cur) if cur in codes else 0 | |
| sel = st.selectbox("Summary language", labels, index=idx) | |
| st.session_state.summary_language = sel.split(' — ')[0] | |
| except Exception: | |
| pass | |
| # Build source from broadcast segments; fallback to full transcription if needed | |
| source_text = "" | |
| if st.session_state.broadcast_segments: | |
| source_text = " \n".join([s.get('text', '') for s in st.session_state.broadcast_segments if s.get('text')]) | |
| elif st.session_state.transcription_data: | |
| td = st.session_state.transcription_data | |
| source_text = td.get('text') or td.get('translated_text', '') or '' | |
| if 'arabic_explanation' not in st.session_state: | |
| st.session_state.arabic_explanation = None | |
| colE, colF = st.columns([1, 4]) | |
| with colE: | |
| if st.button("✍️ Generate summary", use_container_width=True): | |
| with st.spinner("⏳ Generating bullet-point summary..."): | |
| explained, err = generate_summary(source_text or '', target_language=st.session_state.get('summary_language', 'ar')) | |
| if explained: | |
| st.session_state.arabic_explanation = explained | |
| st.success("Summary generated successfully.") | |
| else: | |
| st.error(err or "Failed to create summary. Please try again.") | |
| with colF: | |
| st.text_area("Summary", value=st.session_state.arabic_explanation or "", height=350) | |
| # --- Step 3: Export --- | |
| # Removed Step 3 export UI and related functions per user request. | |
| def reset_session(): | |
| """Resets the session state by clearing specific keys and re-initializing.""" | |
| log_to_browser_console("--- INFO: Resetting session state. ---") | |
| keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording'] | |
| for key in keys_to_clear: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| initialize_session_state() | |
| # --- Entry Point --- | |
| if __name__ == "__main__": | |
| if check_api_key(): | |
| initialize_session_state() | |
| main() | |