# app.py - Refactored to eliminate recorder_server.py dependency import streamlit as st import os import tempfile import json from pathlib import Path import time import traceback import streamlit.components.v1 as components import hashlib # from st_audiorec import st_audiorec # Import the new recorder component - OLD # Reduce metrics/usage writes that can cause permission errors on hosted environments try: st.set_option('browser.gatherUsageStats', False) except Exception: pass # Robust component declaration: prefer local build, else fall back to pip package parent_dir = os.path.dirname(os.path.abspath(__file__)) build_dir = os.path.join(parent_dir, "custom_components/st-audiorec/st_audiorec/frontend/build") def st_audiorec(key=None): """Return audio recorder component value, trying local build first, then pip package fallback.""" try: if os.path.isdir(build_dir): _component_func = components.declare_component("st_audiorec", path=build_dir) return _component_func(key=key, default=0) # Fallback to pip-installed component if available try: from st_audiorec import st_audiorec as st_audiorec_pkg return st_audiorec_pkg(key=key) except Exception: st.warning("Audio recorder component is unavailable on this deployment (missing local build and pip fallback).") return None except Exception: # Final safety net st.warning("Failed to initialize audio recorder component.") return None # --- Critical Imports and Initial Checks --- AUDIO_PROCESSOR_CLASS = None IMPORT_ERROR_TRACEBACK = None try: from audio_processor import AudioProcessor AUDIO_PROCESSOR_CLASS = AudioProcessor except Exception: IMPORT_ERROR_TRACEBACK = traceback.format_exc() from video_generator import VideoGenerator from mp3_embedder import MP3Embedder from utils import format_timestamp from translator import get_translator, UI_TRANSLATIONS import requests from dotenv import load_dotenv # --- API Key Check --- def check_api_key(): """Check for Gemini API key and display instructions if not found.""" load_dotenv() if not os.getenv("GEMINI_API_KEY"): st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!") st.info("To fix this, please follow these steps:") st.markdown(""" 1. **Find the file named `.env.example`** in the `syncmaster2` directory. 2. **Rename it to `.env`**. 3. **Open the `.env` file** with a text editor. 4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey). 5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`. 6. **Save the file and restart the application.** """) return False return True # --- Page Configuration --- st.set_page_config( page_title="SyncMaster - AI Audio-Text Synchronization", page_icon="🎵", layout="wide" ) # --- Browser Console Logging Utility --- def log_to_browser_console(messages): """Injects JavaScript to log messages to the browser's console.""" if isinstance(messages, str): messages = [messages] escaped_messages = [json.dumps(str(msg)) for msg in messages] js_code = f""" """ components.html(js_code, height=0, scrolling=False) # --- Session State Initialization --- def initialize_session_state(): """Initializes the session state variables if they don't exist.""" if 'step' not in st.session_state: st.session_state.step = 1 if 'audio_data' not in st.session_state: st.session_state.audio_data = None if 'language' not in st.session_state: st.session_state.language = 'en' if 'enable_translation' not in st.session_state: st.session_state.enable_translation = True if 'target_language' not in st.session_state: st.session_state.target_language = 'ar' if 'transcription_data' not in st.session_state: st.session_state.transcription_data = None if 'edited_text' not in st.session_state: st.session_state.edited_text = "" if 'video_style' not in st.session_state: st.session_state.video_style = { 'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF', 'highlight_color': '#FFD700', 'background_color': '#000000', 'font_family': 'Arial', 'font_size': 48 } if 'new_recording' not in st.session_state: st.session_state.new_recording = None # Transcript feed (prepend latest) and dedupe set if 'transcript_feed' not in st.session_state: st.session_state.transcript_feed = [] # list of {id, ts, text} if 'transcript_ids' not in st.session_state: st.session_state.transcript_ids = set() # Incremental broadcast state if 'broadcast_segments' not in st.session_state: st.session_state.broadcast_segments = [] # [{id, recording_id, start_ms, end_ms, checksum, text}] if 'lastFetchedEnd_ms' not in st.session_state: st.session_state.lastFetchedEnd_ms = 0 # Broadcast translation language (separate from general UI translation target) if 'broadcast_translation_lang' not in st.session_state: st.session_state.broadcast_translation_lang = 'ar' # --- Centralized Audio Processing Function --- def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): """ A single, robust function to handle all audio processing. Takes audio bytes as input and returns the processed data. """ if not audio_bytes: st.error("No audio data provided to process.") return tmp_file_path = None log_to_browser_console("--- INFO: Starting unified audio processing. ---") try: with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file: tmp_file.write(audio_bytes) tmp_file_path = tmp_file.name processor = AUDIO_PROCESSOR_CLASS() result_data = None full_text = "" word_timestamps = [] # Determine which processing path to take if st.session_state.enable_translation: with st.spinner("⏳ Performing AI Transcription & Translation... please wait."): result_data, processor_logs = processor.get_word_timestamps_with_translation( tmp_file_path, st.session_state.target_language, ) log_to_browser_console(processor_logs) if not result_data or not result_data.get("original_text"): st.warning( "Could not generate transcription with translation. Check browser console (F12) for logs." ) return st.session_state.transcription_data = { "text": result_data["original_text"], "translated_text": result_data["translated_text"], "word_timestamps": result_data["word_timestamps"], "audio_bytes": audio_bytes, "original_suffix": Path(original_filename).suffix, "translation_success": result_data.get("translation_success", False), "detected_language": result_data.get("language_detected", "unknown"), } # Update transcript feed (prepend, dedupe by digest) try: digest = hashlib.md5(audio_bytes).hexdigest() except Exception: digest = f"snap-{int(time.time()*1000)}" if digest not in st.session_state.transcript_ids: st.session_state.transcript_ids.add(digest) st.session_state.transcript_feed.insert( 0, { "id": digest, "ts": int(time.time() * 1000), "text": result_data["original_text"], }, ) # Rebuild edited_text with newest first st.session_state.edited_text = "\n\n".join( [s["text"] for s in st.session_state.transcript_feed] ) else: # Standard processing without translation with st.spinner("⏳ Performing AI Transcription... please wait."): word_timestamps, processor_logs = processor.get_word_timestamps( tmp_file_path ) log_to_browser_console(processor_logs) if not word_timestamps: st.warning( "Could not generate timestamps. Check browser console (F12) for logs." ) return full_text = " ".join([d["word"] for d in word_timestamps]) st.session_state.transcription_data = { "text": full_text, "word_timestamps": word_timestamps, "audio_bytes": audio_bytes, "original_suffix": Path(original_filename).suffix, "translation_success": False, } # Update transcript feed (prepend, dedupe by digest) try: digest = hashlib.md5(audio_bytes).hexdigest() except Exception: digest = f"snap-{int(time.time()*1000)}" if digest not in st.session_state.transcript_ids: st.session_state.transcript_ids.add(digest) st.session_state.transcript_feed.insert( 0, {"id": digest, "ts": int(time.time() * 1000), "text": full_text} ) # Rebuild edited_text with newest first st.session_state.edited_text = "\n\n".join( [s["text"] for s in st.session_state.transcript_feed] ) st.session_state.step = 1 # Keep it on the same step st.success("🎉 AI processing complete! Results are shown below.") except Exception as e: st.error("An unexpected error occurred during audio processing!") st.exception(e) log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---") finally: if tmp_file_path and os.path.exists(tmp_file_path): os.unlink(tmp_file_path) time.sleep(1) st.rerun() # --- Main Application Logic --- def main(): initialize_session_state() st.markdown(""" """, unsafe_allow_html=True) with st.sidebar: st.markdown("## 🌐 Language Settings") language_options = {'English': 'en', 'العربية': 'ar'} selected_lang_display = st.selectbox( "Interface Language", options=list(language_options.keys()), index=0 if st.session_state.language == 'en' else 1 ) st.session_state.language = language_options[selected_lang_display] st.markdown("## 🔤 Translation Settings") st.session_state.enable_translation = st.checkbox( "Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي", value=st.session_state.enable_translation, help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً" ) if st.session_state.enable_translation: target_lang_options = { 'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es' } selected_target = st.selectbox( "Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة", options=list(target_lang_options.keys()), index=0 ) st.session_state.target_language = target_lang_options[selected_target] st.title("🎵 SyncMaster") if st.session_state.language == 'ar': st.markdown("### منصة المزامنة الذكية بين الصوت والنص") else: st.markdown("### The Intelligent Audio-Text Synchronization Platform") col1, col2, col3 = st.columns(3) with col1: st.markdown(f"**{'✅' if st.session_state.step >= 1 else '1️⃣'} Step 1: Upload & Process**") with col2: st.markdown(f"**{'✅' if st.session_state.step >= 2 else '2️⃣'} Step 2: Review & Customize**") with col3: st.markdown(f"**{'✅' if st.session_state.step >= 3 else '3️⃣'} Step 3: Export**") st.divider() # Global settings for long recording retention and custom snapshot duration with st.expander("⚙️ Recording Settings (Snapshots)", expanded=False): st.session_state.setdefault('retention_minutes', 30) # 0 means: use full buffer by default for Custom st.session_state.setdefault('custom_snapshot_seconds', 0) st.session_state.retention_minutes = st.number_input("Retention window (minutes)", min_value=5, max_value=240, value=st.session_state.retention_minutes) st.session_state.custom_snapshot_seconds = st.number_input("Custom snapshot (seconds; 0 = full buffer)", min_value=0, max_value=3600, value=st.session_state.custom_snapshot_seconds) # Inject globals into the page for the component to pick up components.html(f""" """, height=0) if AUDIO_PROCESSOR_CLASS is None: st.error("Fatal Error: The application could not start correctly.") st.subheader("An error occurred while trying to import `AudioProcessor`:") st.code(IMPORT_ERROR_TRACEBACK, language="python") st.stop() step_1_upload_and_process() # Always show results if they exist, regardless of step if st.session_state.transcription_data: step_2_review_and_customize() step_3_export() # --- Step 1: Upload and Process --- def step_1_upload_and_process(): st.header("Step 1: Choose Your Audio Source") upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"]) with upload_tab: st.subheader("Upload an existing audio file") uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A") if uploaded_file: st.session_state.audio_data = uploaded_file.getvalue() st.success(f"File ready for processing: {uploaded_file.name}") st.audio(st.session_state.audio_data) if st.button("🚀 Start AI Processing", type="primary", use_container_width=True): run_audio_processing(st.session_state.audio_data, uploaded_file.name) if st.session_state.audio_data: if st.button("🔄 Use a Different File"): reset_session() st.rerun() with record_tab: st.subheader("Record audio directly from your microphone") st.info("Click the microphone icon to start recording. Use the ⏪ buttons to snapshot the last seconds without stopping. Processing can run automatically.") # Use the audio recorder component wav_audio_data = st_audiorec() # Auto-process incoming snapshots using the existing flow (no external server) st.session_state.setdefault('auto_process_snapshots', True) st.checkbox("Auto-process snapshots (keeps recording)", key='auto_process_snapshots', help="When enabled, any snapshot from the recorder is processed immediately using the classic transcription method.") if wav_audio_data: # Two possible payload shapes: raw bytes array (legacy) or interval payload dict if isinstance(wav_audio_data, dict) and wav_audio_data.get('type') in ('interval_wav', 'no_new'): payload = wav_audio_data if payload['type'] == 'no_new': st.info("لا توجد أجزاء جديدة.") elif payload['type'] == 'interval_wav': # Extract interval audio b = bytes(payload['bytes']) sr = int(payload.get('sr', 16000)) start_ms = int(payload['start_ms']) end_ms = int(payload['end_ms']) # Dedupe/trim logic if end_ms <= start_ms: st.warning("الجزء المُرسل فارغ.") else: # Prevent overlap with prior segment last_end = st.session_state.lastFetchedEnd_ms or 0 eff_start_ms = max(start_ms, last_end) if eff_start_ms < end_ms: # If there is overlap, trim the audio bytes accordingly (assumes WAV PCM16 mono header 44 bytes) try: delta_ms = eff_start_ms - start_ms if delta_ms > 0: if len(b) >= 44 and b[0:4] == b'RIFF' and b[8:12] == b'WAVE': bytes_per_sample = 2 # PCM16 mono drop_samples = int(sr * (delta_ms / 1000.0)) drop_bytes = drop_samples * bytes_per_sample data_size = int.from_bytes(b[40:44], 'little') if len(b) >= 44 else len(b) - 44 pcm = b[44:] if drop_bytes < len(pcm): pcm_trim = pcm[drop_bytes:] else: pcm_trim = b'' new_data_size = len(pcm_trim) # Rebuild header sizes header = bytearray(b[:44]) # ChunkSize at offset 4 = 36 + Subchunk2Size (36 + new_data_size).to_bytes(4, 'little') header[4:8] = (36 + new_data_size).to_bytes(4, 'little') # Subchunk2Size at offset 40 header[40:44] = new_data_size.to_bytes(4, 'little') b = bytes(header) + pcm_trim else: # Not a recognizable WAV header; keep as-is pass except Exception as _: pass # Compute checksum digest = hashlib.md5(b).hexdigest() # Skip if identical checksum and same window exists = any(s.get('checksum') == digest and s.get('start_ms') == eff_start_ms and s.get('end_ms') == end_ms for s in st.session_state.broadcast_segments) if not exists: # Run standard pipeline to get text (no translation to keep it light) # Reuse run_audio_processing internals via a temp path with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tf: tf.write(b) tmp_path = tf.name try: processor = AUDIO_PROCESSOR_CLASS() word_timestamps, processor_logs = processor.get_word_timestamps(tmp_path) full_text = " ".join([d['word'] for d in word_timestamps]) if word_timestamps else "" finally: if os.path.exists(tmp_path): os.unlink(tmp_path) # Auto-translate to selected language if enabled translations = {} try: if full_text and st.session_state.get('enable_translation', True): translator = get_translator() sel_lang = st.session_state.get('broadcast_translation_lang', 'ar') tx, _ = translator.translate_text(full_text, target_language=sel_lang) if tx: translations[sel_lang] = tx except Exception: pass # Append segment seg = { 'id': digest, 'recording_id': payload.get('session_id', 'local'), 'start_ms': eff_start_ms, 'end_ms': end_ms, 'checksum': digest, 'text': full_text, 'translations': translations, } st.session_state.broadcast_segments.append(seg) # Keep sorted by time st.session_state.broadcast_segments.sort(key=lambda s: s['start_ms']) # Update lastFetchedEnd st.session_state.lastFetchedEnd_ms = end_ms # Also update transcript feed (newest first) and edited text so user sees output immediately if full_text: if digest not in st.session_state.transcript_ids: st.session_state.transcript_ids.add(digest) st.session_state.transcript_feed.insert( 0, { "id": digest, "ts": int(time.time() * 1000), "text": full_text, }, ) st.session_state.edited_text = "\n\n".join( [s["text"] for s in st.session_state.transcript_feed] ) st.success(f"تم إضافة جزء جديد: {eff_start_ms/1000:.2f}s → {end_ms/1000:.2f}s") else: st.info("تم تجاهل جزء مكرر.") else: st.info("لا توجد أجزاء جديدة بعد آخر نقطة.") else: # Legacy: treat as full wav bytes bytes_data = bytes(wav_audio_data) st.session_state.audio_data = bytes_data st.audio(bytes_data) digest = hashlib.md5(bytes_data).hexdigest() last_digest = st.session_state.get('_last_component_digest') if st.session_state.auto_process_snapshots and digest != last_digest: st.session_state['_last_component_digest'] = digest run_audio_processing(bytes_data, "snapshot.wav") else: if st.button("📝 Extract Text", type="primary", use_container_width=True): st.session_state['_last_component_digest'] = digest run_audio_processing(bytes_data, "recorded_audio.wav") # Simplified: removed external live slice server UI to avoid complexity # Always show Broadcast view in Step 1 as well (regardless of transcription_data) with st.expander("📻 Broadcast (chronological)", expanded=True): # Language selector for broadcast translations try: translator = get_translator() langs = translator.get_supported_languages() codes = list(langs.keys()) labels = [f"{code} — {langs[code]}" for code in codes] current = st.session_state.get('broadcast_translation_lang', 'ar') default_index = codes.index(current) if current in codes else 0 sel_label = st.selectbox("Broadcast translation language", labels, index=default_index) sel_code = sel_label.split(' — ')[0] st.session_state.broadcast_translation_lang = sel_code except Exception: sel_code = st.session_state.get('broadcast_translation_lang', 'ar') if st.session_state.broadcast_segments: for s in st.session_state.broadcast_segments: st.markdown(f"**[{s['start_ms']/1000:.2f}s → {s['end_ms']/1000:.2f}s]**") st.write(s.get('text', '')) # Ensure and show translation in selected language if s.get('text') and st.session_state.get('enable_translation', True): if 'translations' not in s or not isinstance(s.get('translations'), dict): s['translations'] = {} if sel_code not in s['translations']: try: tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code) if tx: s['translations'][sel_code] = tx except Exception: pass if s['translations'].get(sel_code): st.caption(f"الترجمة ({sel_code}):") st.write(s['translations'][sel_code]) st.divider() else: st.caption("لا توجد أجزاء بعد. استخدم زر Custom أثناء التسجيل.") # Note: external live slice helper removed to keep the app simple and fully local # --- Step 2: Review and Customize --- def step_2_review_and_customize(): st.header("✅ Extracted Text & Translation") # Display translation results if available if st.session_state.transcription_data.get('translation_success', False): st.success(f"🌐 Translation completed! Detected language: {st.session_state.transcription_data.get('detected_language', 'N/A')}") col1, col2 = st.columns(2) with col1: st.subheader("Original Text") st.text_area("Original Transcription", value=st.session_state.transcription_data['text'], height=150, key="original_text_area") st.button("📋 Copy Original Text", on_click=lambda: st.toast("Copied to clipboard!"), args=(), kwargs={'clipboard': st.session_state.transcription_data['text']}) with col2: st.subheader(f"Translation ({st.session_state.target_language.upper()})") st.text_area("Translated Text", value=st.session_state.transcription_data['translated_text'], height=150, key="translated_text_area") st.button("📋 Copy Translated Text", on_click=lambda: st.toast("Copied to clipboard!"), args=(), kwargs={'clipboard': st.session_state.transcription_data['translated_text']}) # Broadcast view (chronological) with st.expander("📻 Broadcast (chronological)", expanded=True): # Use the same selected language from Step 1 (do not duplicate selector to avoid state conflicts) sel_code = st.session_state.get('broadcast_translation_lang', 'ar') if st.session_state.broadcast_segments: for s in st.session_state.broadcast_segments: st.markdown(f"**[{s['start_ms']/1000:.2f}s → {s['end_ms']/1000:.2f}s]**") st.write(s.get('text', '')) # Ensure and show translation in selected language if s.get('text') and st.session_state.get('enable_translation', True): if 'translations' not in s or not isinstance(s.get('translations'), dict): s['translations'] = {} if sel_code not in s['translations']: try: tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code) if tx: s['translations'][sel_code] = tx except Exception: pass if s['translations'].get(sel_code): st.caption(f"الترجمة ({sel_code}):") st.write(s['translations'][sel_code]) st.divider() else: st.caption("لا توجد أجزاء بعد. استخدم زر Custom أثناء التسجيل.") # Show transcript feed (newest first) with simple dedupe behavior with st.expander("🧾 Transcript Feed (latest first)", expanded=True): if st.session_state.transcript_feed: for i, s in enumerate(st.session_state.transcript_feed): st.markdown(f"**Snippet {i+1}** — id: `{s['id']}`") st.write(s['text']) st.divider() else: st.caption("No transcript snippets yet. Take a snapshot to populate this feed.") col1, col2 = st.columns([3, 2]) with col1: st.subheader("📝 Text Editor") st.info("Edit the original transcribed text below. This text will be used for the final export.") edited_text = st.text_area("Transcribed Text", value=st.session_state.edited_text, height=300) st.session_state.edited_text = edited_text with col2: st.subheader("🎨 Video Style Customization") st.session_state.video_style['animation_style'] = st.selectbox("Animation Style", ["Karaoke Style", "Pop-up Word"]) st.session_state.video_style['text_color'] = st.color_picker("Text Color", st.session_state.video_style['text_color']) st.session_state.video_style['highlight_color'] = st.color_picker("Highlight Color", st.session_state.video_style['highlight_color']) # Remove navigation buttons st.divider() st.subheader("🧠 شرح تعليمي بالعربية (غير ترجمة حرفية)") st.info("هذا القسم يقدّم شرحًا تفصيليًا ومنظمًا باللغة العربية للمحتوى، مع أمثلة ونقاط مُلخّصة، ليس ترجمة حرفية.") # Determine source for explanation: prefer Arabic translation if available, else original source_text = None td = st.session_state.transcription_data if td.get('translation_success') and td.get('translated_text'): source_text = td['translated_text'] else: source_text = td.get('text', '') if 'arabic_explanation' not in st.session_state: st.session_state.arabic_explanation = None colE, colF = st.columns([1, 4]) with colE: if st.button("✍️ توليد الشرح بالعربية", use_container_width=True): translator = get_translator() with st.spinner("⏳ جاري توليد الشرح التفصيلي بالعربية..."): explained, err = translator.explain_text_arabic(source_text or '') if explained: st.session_state.arabic_explanation = explained st.success("تم إنشاء الشرح بنجاح.") else: st.error(err or "تعذّر إنشاء الشرح. حاول مجددًا.") with colF: st.text_area("الشرح العربي التفصيلي", value=st.session_state.arabic_explanation or "", height=350) # --- Step 3: Export --- def step_3_export(): st.header("⬇️ Export Your Media") col1, col2 = st.columns(2) with col1: st.subheader("🎵 MP3 Export") if st.button("📱 Export MP3 with Lyrics", type="primary", use_container_width=True): export_mp3() with col2: st.subheader("🎬 MP4 Video Export") if st.button("🎥 Generate Video Summary", type="primary", use_container_width=True): export_mp4() st.divider() # "Record Again" functionality is now handled by the component itself. # --- MP3 Export Function --- def export_mp3(): audio_path_for_export = None log_to_browser_console("--- INFO: Starting MP3 export process. ---") try: with st.spinner("⏳ Exporting MP3... Please wait, this may take a moment."): suffix = st.session_state.transcription_data['original_suffix'] audio_bytes = st.session_state.transcription_data['audio_bytes'] with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_audio_file: tmp_audio_file.write(audio_bytes) audio_path_for_export = tmp_audio_file.name embedder = MP3Embedder() word_timestamps = st.session_state.transcription_data['word_timestamps'] output_filename = f"synced_{Path(st.session_state.transcription_data.get('original_filename', 'audio')).stem}.mp3" output_path, log_messages = embedder.embed_sylt_lyrics( audio_path_for_export, word_timestamps, st.session_state.edited_text, output_filename ) log_to_browser_console(log_messages) st.subheader("✅ Export Complete") if os.path.exists(output_path): with open(output_path, 'rb') as f: audio_bytes_to_download = f.read() st.audio(audio_bytes_to_download, format='audio/mp3') verification = embedder.verify_sylt_embedding(output_path) if verification.get('has_sylt'): st.success(f"Successfully embedded {verification.get('sylt_entries', 0)} words!") else: st.warning("Warning: Could not verify SYLT embedding.") st.download_button("Download Synced MP3", audio_bytes_to_download, output_filename, "audio/mpeg", use_container_width=True) else: st.error("Failed to create the final MP3 file.") except Exception as e: st.error(f"An unexpected error occurred during MP3 export: {e}") log_to_browser_console(f"--- FATAL ERROR in export_mp3: {traceback.format_exc()} ---") finally: if audio_path_for_export and os.path.exists(audio_path_for_export): os.unlink(audio_path_for_export) # --- Placeholder and Utility Functions --- def export_mp4(): with st.spinner("⏳ Preparing video export..."): time.sleep(1) # Simulate work to provide feedback st.info("MP4 export functionality is not yet implemented.") def reset_session(): """Resets the session state by clearing specific keys and re-initializing.""" log_to_browser_console("--- INFO: Resetting session state. ---") keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording'] for key in keys_to_clear: if key in st.session_state: del st.session_state[key] initialize_session_state() # --- Entry Point --- if __name__ == "__main__": if check_api_key(): initialize_session_state() main()