# app.py - Refactored to eliminate recorder_server.py dependency import streamlit as st import os import tempfile import json from pathlib import Path import time import traceback import streamlit.components.v1 as components from st_audiorec import st_audiorec # Import the new recorder component # --- Critical Imports and Initial Checks --- AUDIO_PROCESSOR_CLASS = None IMPORT_ERROR_TRACEBACK = None try: from audio_processor import AudioProcessor AUDIO_PROCESSOR_CLASS = AudioProcessor except Exception: IMPORT_ERROR_TRACEBACK = traceback.format_exc() from video_generator import VideoGenerator from mp3_embedder import MP3Embedder from utils import format_timestamp from translator import get_translator, UI_TRANSLATIONS from dotenv import load_dotenv # --- API Key Check --- def check_api_key(): """Check for Gemini API key and display instructions if not found.""" load_dotenv() if not os.getenv("GEMINI_API_KEY"): st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!") st.info("To fix this, please follow these steps:") st.markdown(""" 1. **Find the file named `.env.example`** in the `syncmaster2` directory. 2. **Rename it to `.env`**. 3. **Open the `.env` file** with a text editor. 4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey). 5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`. 6. **Save the file and restart the application.** """) return False return True # --- Page Configuration --- st.set_page_config( page_title="SyncMaster - AI Audio-Text Synchronization", page_icon="🎵", layout="wide" ) # --- Browser Console Logging Utility --- def log_to_browser_console(messages): """Injects JavaScript to log messages to the browser's console.""" if isinstance(messages, str): messages = [messages] escaped_messages = [json.dumps(str(msg)) for msg in messages] js_code = f""" """ components.html(js_code, height=0, scrolling=False) # --- Session State Initialization --- def initialize_session_state(): """Initializes the session state variables if they don't exist.""" if 'step' not in st.session_state: st.session_state.step = 1 if 'audio_data' not in st.session_state: st.session_state.audio_data = None if 'language' not in st.session_state: st.session_state.language = 'en' if 'enable_translation' not in st.session_state: st.session_state.enable_translation = True if 'target_language' not in st.session_state: st.session_state.target_language = 'ar' if 'transcription_data' not in st.session_state: st.session_state.transcription_data = None if 'edited_text' not in st.session_state: st.session_state.edited_text = "" if 'video_style' not in st.session_state: st.session_state.video_style = { 'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF', 'highlight_color': '#FFD700', 'background_color': '#000000', 'font_family': 'Arial', 'font_size': 48 } if 'new_recording' not in st.session_state: st.session_state.new_recording = None # --- Centralized Audio Processing Function --- def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): """ A single, robust function to handle all audio processing. Takes audio bytes as input and returns the processed data. """ if not audio_bytes: st.error("No audio data provided to process.") return tmp_file_path = None log_to_browser_console("--- INFO: Starting unified audio processing. ---") try: with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file: tmp_file.write(audio_bytes) tmp_file_path = tmp_file.name processor = AUDIO_PROCESSOR_CLASS() result_data = None full_text = "" word_timestamps = [] # Determine which processing path to take if st.session_state.enable_translation: with st.spinner("⏳ Performing AI Transcription & Translation... please wait."): result_data, processor_logs = processor.get_word_timestamps_with_translation( tmp_file_path, st.session_state.target_language ) log_to_browser_console(processor_logs) if not result_data or not result_data.get('original_text'): st.warning("Could not generate transcription with translation. Check browser console (F12) for logs.") return st.session_state.transcription_data = { 'text': result_data['original_text'], 'translated_text': result_data['translated_text'], 'word_timestamps': result_data['word_timestamps'], 'audio_bytes': audio_bytes, 'original_suffix': Path(original_filename).suffix, 'translation_success': result_data.get('translation_success', False), 'detected_language': result_data.get('language_detected', 'unknown') } st.session_state.edited_text = result_data['original_text'] else: # Standard processing without translation with st.spinner("⏳ Performing AI Transcription... please wait."): word_timestamps, processor_logs = processor.get_word_timestamps(tmp_file_path) log_to_browser_console(processor_logs) if not word_timestamps: st.warning("Could not generate timestamps. Check browser console (F12) for logs.") return full_text = " ".join([d['word'] for d in word_timestamps]) st.session_state.transcription_data = { 'text': full_text, 'word_timestamps': word_timestamps, 'audio_bytes': audio_bytes, 'original_suffix': Path(original_filename).suffix, 'translation_success': False } st.session_state.edited_text = full_text st.session_state.step = 2 st.success("🎉 AI processing complete! Please review the results.") except Exception as e: st.error("An unexpected error occurred during audio processing!") st.exception(e) log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---") finally: if tmp_file_path and os.path.exists(tmp_file_path): os.unlink(tmp_file_path) time.sleep(1) st.rerun() # --- Main Application Logic --- def main(): initialize_session_state() st.markdown(""" """, unsafe_allow_html=True) with st.sidebar: st.markdown("## 🌐 Language Settings") language_options = {'English': 'en', 'العربية': 'ar'} selected_lang_display = st.selectbox( "Interface Language", options=list(language_options.keys()), index=0 if st.session_state.language == 'en' else 1 ) st.session_state.language = language_options[selected_lang_display] st.markdown("## 🔤 Translation Settings") st.session_state.enable_translation = st.checkbox( "Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي", value=st.session_state.enable_translation, help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً" ) if st.session_state.enable_translation: target_lang_options = { 'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es' } selected_target = st.selectbox( "Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة", options=list(target_lang_options.keys()), index=0 ) st.session_state.target_language = target_lang_options[selected_target] st.title("🎵 SyncMaster") if st.session_state.language == 'ar': st.markdown("### منصة المزامنة الذكية بين الصوت والنص") else: st.markdown("### The Intelligent Audio-Text Synchronization Platform") col1, col2, col3 = st.columns(3) with col1: st.markdown(f"**{'✅' if st.session_state.step >= 1 else '1️⃣'} Step 1: Upload & Process**") with col2: st.markdown(f"**{'✅' if st.session_state.step >= 2 else '2️⃣'} Step 2: Review & Customize**") with col3: st.markdown(f"**{'✅' if st.session_state.step >= 3 else '3️⃣'} Step 3: Export**") st.divider() if AUDIO_PROCESSOR_CLASS is None: st.error("Fatal Error: The application could not start correctly.") st.subheader("An error occurred while trying to import `AudioProcessor`:") st.code(IMPORT_ERROR_TRACEBACK, language="python") st.stop() if st.session_state.step == 1: step_1_upload_and_process() elif st.session_state.step == 2: step_2_review_and_customize() elif st.session_state.step == 3: step_3_export() # --- Step 1: Upload and Process --- def step_1_upload_and_process(): st.header("Step 1: Choose Your Audio Source") upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"]) with upload_tab: st.subheader("Upload an existing audio file") uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A") if uploaded_file: st.session_state.audio_data = uploaded_file.getvalue() st.success(f"File ready for processing: {uploaded_file.name}") st.audio(st.session_state.audio_data) if st.button("🚀 Start AI Processing", type="primary", use_container_width=True): run_audio_processing(st.session_state.audio_data, uploaded_file.name) if st.session_state.audio_data: if st.button("🔄 Use a Different File"): reset_session() st.rerun() with record_tab: st.subheader("Record audio directly from your microphone") st.info("Click the microphone icon to start recording. Click the stop icon when you are finished. Processing will begin automatically.") # Use the audio recorder component wav_audio_data = st_audiorec() # If a new recording is made, process it automatically if wav_audio_data is not None and st.session_state.new_recording != wav_audio_data: st.session_state.new_recording = wav_audio_data # Immediately process the new recording run_audio_processing(st.session_state.new_recording, "recorded_audio.wav") # --- Step 2: Review and Customize --- def step_2_review_and_customize(): st.header("Step 2: Review & Customize") if 'transcription_data' not in st.session_state or not st.session_state.transcription_data: st.error("No data found. Please return to Step 1.") if st.button("← Back to Step 1"): st.session_state.step = 1; st.rerun() return # Display translation results if available if st.session_state.transcription_data.get('translation_success', False): st.success(f"🌐 Translation completed! Detected language: {st.session_state.transcription_data.get('detected_language', 'N/A')}") col1, col2 = st.columns(2) with col1: st.subheader("Original Text") st.text_area("Original Transcription", value=st.session_state.transcription_data['text'], height=150, disabled=True) with col2: st.subheader(f"Translation ({st.session_state.target_language.upper()})") st.text_area("Translated Text", value=st.session_state.transcription_data['translated_text'], height=150, disabled=True) col1, col2 = st.columns([3, 2]) with col1: st.subheader("📝 Text Editor") st.info("Edit the original transcribed text below. This text will be used for the final export.") edited_text = st.text_area("Transcribed Text", value=st.session_state.edited_text, height=300) st.session_state.edited_text = edited_text with col2: st.subheader("🎨 Video Style Customization") st.session_state.video_style['animation_style'] = st.selectbox("Animation Style", ["Karaoke Style", "Pop-up Word"]) st.session_state.video_style['text_color'] = st.color_picker("Text Color", st.session_state.video_style['text_color']) st.session_state.video_style['highlight_color'] = st.color_picker("Highlight Color", st.session_state.video_style['highlight_color']) col1_nav, _, col3_nav = st.columns([1, 2, 1]) with col1_nav: if st.button("← Back to Upload"): st.session_state.step = 1; st.rerun() with col3_nav: if st.button("Continue to Export →", type="primary"): st.session_state.step = 3; st.rerun() # --- Step 3: Export --- def step_3_export(): st.header("Step 3: Export Your Synchronized Media") if 'transcription_data' not in st.session_state or not st.session_state.transcription_data: st.error("No data found. Please return to Step 1.") if st.button("← Back to Step 1"): st.session_state.step = 1; st.rerun() return col1, col2 = st.columns(2) with col1: st.subheader("🎵 MP3 Export") if st.button("📱 Export MP3 with Lyrics", type="primary", use_container_width=True): export_mp3() with col2: st.subheader("🎬 MP4 Video Export") if st.button("🎥 Generate Video Summary", type="primary", use_container_width=True): export_mp4() st.divider() col1_nav, col2_nav, _ = st.columns([1, 1, 3]) with col1_nav: if st.button("← Back to Customize"): st.session_state.step = 2; st.rerun() with col2_nav: if st.button("🔄 Start Over"): reset_session(); st.rerun() # --- MP3 Export Function --- def export_mp3(): audio_path_for_export = None log_to_browser_console("--- INFO: Starting MP3 export process. ---") try: with st.spinner("⏳ Exporting MP3... Please wait, this may take a moment."): suffix = st.session_state.transcription_data['original_suffix'] audio_bytes = st.session_state.transcription_data['audio_bytes'] with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_audio_file: tmp_audio_file.write(audio_bytes) audio_path_for_export = tmp_audio_file.name embedder = MP3Embedder() word_timestamps = st.session_state.transcription_data['word_timestamps'] output_filename = f"synced_{Path(st.session_state.transcription_data.get('original_filename', 'audio')).stem}.mp3" output_path, log_messages = embedder.embed_sylt_lyrics( audio_path_for_export, word_timestamps, st.session_state.edited_text, output_filename ) log_to_browser_console(log_messages) st.subheader("✅ Export Complete") if os.path.exists(output_path): with open(output_path, 'rb') as f: audio_bytes_to_download = f.read() st.audio(audio_bytes_to_download, format='audio/mp3') verification = embedder.verify_sylt_embedding(output_path) if verification.get('has_sylt'): st.success(f"Successfully embedded {verification.get('sylt_entries', 0)} words!") else: st.warning("Warning: Could not verify SYLT embedding.") st.download_button("Download Synced MP3", audio_bytes_to_download, output_filename, "audio/mpeg", use_container_width=True) else: st.error("Failed to create the final MP3 file.") except Exception as e: st.error(f"An unexpected error occurred during MP3 export: {e}") log_to_browser_console(f"--- FATAL ERROR in export_mp3: {traceback.format_exc()} ---") finally: if audio_path_for_export and os.path.exists(audio_path_for_export): os.unlink(audio_path_for_export) # --- Placeholder and Utility Functions --- def export_mp4(): with st.spinner("⏳ Preparing video export..."): time.sleep(1) # Simulate work to provide feedback st.info("MP4 export functionality is not yet implemented.") def reset_session(): """Resets the session state by clearing specific keys and re-initializing.""" log_to_browser_console("--- INFO: Resetting session state. ---") keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording'] for key in keys_to_clear: if key in st.session_state: del st.session_state[key] initialize_session_state() # --- Entry Point --- if __name__ == "__main__": if check_api_key(): initialize_session_state() main()