Spaces:
Sleeping
Sleeping
| # app.py - Refactored to eliminate recorder_server.py dependency | |
| import streamlit as st | |
| import os | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| import time | |
| import traceback | |
| import streamlit.components.v1 as components | |
| from st_audiorec import st_audiorec # Import the new recorder component | |
| # --- Critical Imports and Initial Checks --- | |
| AUDIO_PROCESSOR_CLASS = None | |
| IMPORT_ERROR_TRACEBACK = None | |
| try: | |
| from audio_processor import AudioProcessor | |
| AUDIO_PROCESSOR_CLASS = AudioProcessor | |
| except Exception: | |
| IMPORT_ERROR_TRACEBACK = traceback.format_exc() | |
| from video_generator import VideoGenerator | |
| from mp3_embedder import MP3Embedder | |
| from utils import format_timestamp | |
| from translator import get_translator, UI_TRANSLATIONS | |
| from dotenv import load_dotenv | |
| # --- API Key Check --- | |
| def check_api_key(): | |
| """Check for Gemini API key and display instructions if not found.""" | |
| load_dotenv() | |
| if not os.getenv("GEMINI_API_KEY"): | |
| st.error("π΄ FATAL ERROR: GEMINI_API_KEY is not set!") | |
| st.info("To fix this, please follow these steps:") | |
| st.markdown(""" | |
| 1. **Find the file named `.env.example`** in the `syncmaster2` directory. | |
| 2. **Rename it to `.env`**. | |
| 3. **Open the `.env` file** with a text editor. | |
| 4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey). | |
| 5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`. | |
| 6. **Save the file and restart the application.** | |
| """) | |
| return False | |
| return True | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="SyncMaster - AI Audio-Text Synchronization", | |
| page_icon="π΅", | |
| layout="wide" | |
| ) | |
| # --- Browser Console Logging Utility --- | |
| def log_to_browser_console(messages): | |
| """Injects JavaScript to log messages to the browser's console.""" | |
| if isinstance(messages, str): | |
| messages = [messages] | |
| escaped_messages = [json.dumps(str(msg)) for msg in messages] | |
| js_code = f""" | |
| <script> | |
| (function() {{ | |
| const logs = [{', '.join(escaped_messages)}]; | |
| console.group("Backend Logs from SyncMaster"); | |
| logs.forEach(log => {{ | |
| const content = String(log); | |
| if (content.includes('--- ERROR') || content.includes('--- FATAL')) {{ | |
| console.error(log); | |
| }} else if (content.includes('--- WARNING')) {{ | |
| console.warn(log); | |
| }} else if (content.includes('--- DEBUG')) {{ | |
| console.debug(log); | |
| }} else {{ | |
| console.log(log); | |
| }} | |
| }}); | |
| console.groupEnd(); | |
| }})(); | |
| </script> | |
| """ | |
| components.html(js_code, height=0, scrolling=False) | |
| # --- Session State Initialization --- | |
| def initialize_session_state(): | |
| """Initializes the session state variables if they don't exist.""" | |
| if 'step' not in st.session_state: | |
| st.session_state.step = 1 | |
| if 'audio_data' not in st.session_state: | |
| st.session_state.audio_data = None | |
| if 'language' not in st.session_state: | |
| st.session_state.language = 'en' | |
| if 'enable_translation' not in st.session_state: | |
| st.session_state.enable_translation = True | |
| if 'target_language' not in st.session_state: | |
| st.session_state.target_language = 'ar' | |
| if 'transcription_data' not in st.session_state: | |
| st.session_state.transcription_data = None | |
| if 'edited_text' not in st.session_state: | |
| st.session_state.edited_text = "" | |
| if 'video_style' not in st.session_state: | |
| st.session_state.video_style = { | |
| 'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF', | |
| 'highlight_color': '#FFD700', 'background_color': '#000000', | |
| 'font_family': 'Arial', 'font_size': 48 | |
| } | |
| if 'new_recording' not in st.session_state: | |
| st.session_state.new_recording = None | |
| # --- Centralized Audio Processing Function --- | |
| def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"): | |
| """ | |
| A single, robust function to handle all audio processing. | |
| Takes audio bytes as input and returns the processed data. | |
| """ | |
| if not audio_bytes: | |
| st.error("No audio data provided to process.") | |
| return | |
| tmp_file_path = None | |
| log_to_browser_console("--- INFO: Starting unified audio processing. ---") | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file: | |
| tmp_file.write(audio_bytes) | |
| tmp_file_path = tmp_file.name | |
| processor = AUDIO_PROCESSOR_CLASS() | |
| result_data = None | |
| full_text = "" | |
| word_timestamps = [] | |
| # Determine which processing path to take | |
| if st.session_state.enable_translation: | |
| with st.spinner("β³ Performing AI Transcription & Translation... please wait."): | |
| result_data, processor_logs = processor.get_word_timestamps_with_translation( | |
| tmp_file_path, | |
| st.session_state.target_language | |
| ) | |
| log_to_browser_console(processor_logs) | |
| if not result_data or not result_data.get('original_text'): | |
| st.warning("Could not generate transcription with translation. Check browser console (F12) for logs.") | |
| return | |
| st.session_state.transcription_data = { | |
| 'text': result_data['original_text'], | |
| 'translated_text': result_data['translated_text'], | |
| 'word_timestamps': result_data['word_timestamps'], | |
| 'audio_bytes': audio_bytes, | |
| 'original_suffix': Path(original_filename).suffix, | |
| 'translation_success': result_data.get('translation_success', False), | |
| 'detected_language': result_data.get('language_detected', 'unknown') | |
| } | |
| st.session_state.edited_text = result_data['original_text'] | |
| else: # Standard processing without translation | |
| with st.spinner("β³ Performing AI Transcription... please wait."): | |
| word_timestamps, processor_logs = processor.get_word_timestamps(tmp_file_path) | |
| log_to_browser_console(processor_logs) | |
| if not word_timestamps: | |
| st.warning("Could not generate timestamps. Check browser console (F12) for logs.") | |
| return | |
| full_text = " ".join([d['word'] for d in word_timestamps]) | |
| st.session_state.transcription_data = { | |
| 'text': full_text, | |
| 'word_timestamps': word_timestamps, | |
| 'audio_bytes': audio_bytes, | |
| 'original_suffix': Path(original_filename).suffix, | |
| 'translation_success': False | |
| } | |
| st.session_state.edited_text = full_text | |
| st.session_state.step = 2 | |
| st.success("π AI processing complete! Please review the results.") | |
| except Exception as e: | |
| st.error("An unexpected error occurred during audio processing!") | |
| st.exception(e) | |
| log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---") | |
| finally: | |
| if tmp_file_path and os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| time.sleep(1) | |
| st.rerun() | |
| # --- Main Application Logic --- | |
| def main(): | |
| initialize_session_state() | |
| st.markdown(""" | |
| <style> | |
| .stSpinner { display: none !important; } | |
| .main .block-container { animation: fadeIn 0.2s ease-in-out; } | |
| @keyframes fadeIn { from { opacity: 0; } to { opacity: 1; } } | |
| .block-container { padding-top: 1rem; } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| with st.sidebar: | |
| st.markdown("## π Language Settings") | |
| language_options = {'English': 'en', 'Ψ§ΩΨΉΨ±Ψ¨ΩΨ©': 'ar'} | |
| selected_lang_display = st.selectbox( | |
| "Interface Language", | |
| options=list(language_options.keys()), | |
| index=0 if st.session_state.language == 'en' else 1 | |
| ) | |
| st.session_state.language = language_options[selected_lang_display] | |
| st.markdown("## π€ Translation Settings") | |
| st.session_state.enable_translation = st.checkbox( | |
| "Enable AI Translation" if st.session_state.language == 'en' else "ΨͺΩΨΉΩΩ Ψ§ΩΨͺΨ±Ψ¬Ω Ψ© Ψ¨Ψ§ΩΨ°ΩΨ§Ψ‘ Ψ§ΩΨ§Ψ΅Ψ·ΩΨ§ΨΉΩ", | |
| value=st.session_state.enable_translation, | |
| help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ΨͺΨ±Ψ¬Ω Ψ© Ψ§ΩΩΨ΅ ΨͺΩΩΨ§Ψ¦ΩΨ§Ω" | |
| ) | |
| if st.session_state.enable_translation: | |
| target_lang_options = { | |
| 'Arabic (Ψ§ΩΨΉΨ±Ψ¨ΩΨ©)': 'ar', 'English': 'en', 'French (FranΓ§ais)': 'fr', 'Spanish (EspaΓ±ol)': 'es' | |
| } | |
| selected_target = st.selectbox( | |
| "Target Language" if st.session_state.language == 'en' else "Ψ§ΩΩΨΊΨ© Ψ§ΩΩ Ψ³ΨͺΩΨ―ΩΨ©", | |
| options=list(target_lang_options.keys()), index=0 | |
| ) | |
| st.session_state.target_language = target_lang_options[selected_target] | |
| st.title("π΅ SyncMaster") | |
| if st.session_state.language == 'ar': | |
| st.markdown("### Ω ΩΨ΅Ψ© Ψ§ΩΩ Ψ²Ψ§Ω ΩΨ© Ψ§ΩΨ°ΩΩΨ© Ψ¨ΩΩ Ψ§ΩΨ΅ΩΨͺ ΩΨ§ΩΩΨ΅") | |
| else: | |
| st.markdown("### The Intelligent Audio-Text Synchronization Platform") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown(f"**{'β ' if st.session_state.step >= 1 else '1οΈβ£'} Step 1: Upload & Process**") | |
| with col2: | |
| st.markdown(f"**{'β ' if st.session_state.step >= 2 else '2οΈβ£'} Step 2: Review & Customize**") | |
| with col3: | |
| st.markdown(f"**{'β ' if st.session_state.step >= 3 else '3οΈβ£'} Step 3: Export**") | |
| st.divider() | |
| if AUDIO_PROCESSOR_CLASS is None: | |
| st.error("Fatal Error: The application could not start correctly.") | |
| st.subheader("An error occurred while trying to import `AudioProcessor`:") | |
| st.code(IMPORT_ERROR_TRACEBACK, language="python") | |
| st.stop() | |
| if st.session_state.step == 1: | |
| step_1_upload_and_process() | |
| elif st.session_state.step == 2: | |
| step_2_review_and_customize() | |
| elif st.session_state.step == 3: | |
| step_3_export() | |
| # --- Step 1: Upload and Process --- | |
| def step_1_upload_and_process(): | |
| st.header("Step 1: Choose Your Audio Source") | |
| upload_tab, record_tab = st.tabs(["π€ Upload a File", "ποΈ Record Audio"]) | |
| with upload_tab: | |
| st.subheader("Upload an existing audio file") | |
| uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A") | |
| if uploaded_file: | |
| st.session_state.audio_data = uploaded_file.getvalue() | |
| st.success(f"File ready for processing: {uploaded_file.name}") | |
| st.audio(st.session_state.audio_data) | |
| if st.button("π Start AI Processing", type="primary", use_container_width=True): | |
| run_audio_processing(st.session_state.audio_data, uploaded_file.name) | |
| if st.session_state.audio_data: | |
| if st.button("π Use a Different File"): | |
| reset_session() | |
| st.rerun() | |
| with record_tab: | |
| st.subheader("Record audio directly from your microphone") | |
| st.info("Click the microphone icon to start recording. Click the stop icon when you are finished. Processing will begin automatically.") | |
| # Use the audio recorder component | |
| wav_audio_data = st_audiorec() | |
| # If a new recording is made, process it automatically | |
| if wav_audio_data is not None and st.session_state.new_recording != wav_audio_data: | |
| st.session_state.new_recording = wav_audio_data | |
| # Immediately process the new recording | |
| run_audio_processing(st.session_state.new_recording, "recorded_audio.wav") | |
| # --- Step 2: Review and Customize --- | |
| def step_2_review_and_customize(): | |
| st.header("Step 2: Review & Customize") | |
| if 'transcription_data' not in st.session_state or not st.session_state.transcription_data: | |
| st.error("No data found. Please return to Step 1.") | |
| if st.button("β Back to Step 1"): | |
| st.session_state.step = 1; st.rerun() | |
| return | |
| # Display translation results if available | |
| if st.session_state.transcription_data.get('translation_success', False): | |
| st.success(f"π Translation completed! Detected language: {st.session_state.transcription_data.get('detected_language', 'N/A')}") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Original Text") | |
| st.text_area("Original Transcription", value=st.session_state.transcription_data['text'], height=150, disabled=True) | |
| with col2: | |
| st.subheader(f"Translation ({st.session_state.target_language.upper()})") | |
| st.text_area("Translated Text", value=st.session_state.transcription_data['translated_text'], height=150, disabled=True) | |
| col1, col2 = st.columns([3, 2]) | |
| with col1: | |
| st.subheader("π Text Editor") | |
| st.info("Edit the original transcribed text below. This text will be used for the final export.") | |
| edited_text = st.text_area("Transcribed Text", value=st.session_state.edited_text, height=300) | |
| st.session_state.edited_text = edited_text | |
| with col2: | |
| st.subheader("π¨ Video Style Customization") | |
| st.session_state.video_style['animation_style'] = st.selectbox("Animation Style", ["Karaoke Style", "Pop-up Word"]) | |
| st.session_state.video_style['text_color'] = st.color_picker("Text Color", st.session_state.video_style['text_color']) | |
| st.session_state.video_style['highlight_color'] = st.color_picker("Highlight Color", st.session_state.video_style['highlight_color']) | |
| col1_nav, _, col3_nav = st.columns([1, 2, 1]) | |
| with col1_nav: | |
| if st.button("β Back to Upload"): | |
| st.session_state.step = 1; st.rerun() | |
| with col3_nav: | |
| if st.button("Continue to Export β", type="primary"): | |
| st.session_state.step = 3; st.rerun() | |
| # --- Step 3: Export --- | |
| def step_3_export(): | |
| st.header("Step 3: Export Your Synchronized Media") | |
| if 'transcription_data' not in st.session_state or not st.session_state.transcription_data: | |
| st.error("No data found. Please return to Step 1.") | |
| if st.button("β Back to Step 1"): | |
| st.session_state.step = 1; st.rerun() | |
| return | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("π΅ MP3 Export") | |
| if st.button("π± Export MP3 with Lyrics", type="primary", use_container_width=True): | |
| export_mp3() | |
| with col2: | |
| st.subheader("π¬ MP4 Video Export") | |
| if st.button("π₯ Generate Video Summary", type="primary", use_container_width=True): | |
| export_mp4() | |
| st.divider() | |
| col1_nav, col2_nav, _ = st.columns([1, 1, 3]) | |
| with col1_nav: | |
| if st.button("β Back to Customize"): | |
| st.session_state.step = 2; st.rerun() | |
| with col2_nav: | |
| if st.button("π Start Over"): | |
| reset_session(); st.rerun() | |
| # --- MP3 Export Function --- | |
| def export_mp3(): | |
| audio_path_for_export = None | |
| log_to_browser_console("--- INFO: Starting MP3 export process. ---") | |
| try: | |
| with st.spinner("β³ Exporting MP3... Please wait, this may take a moment."): | |
| suffix = st.session_state.transcription_data['original_suffix'] | |
| audio_bytes = st.session_state.transcription_data['audio_bytes'] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_audio_file: | |
| tmp_audio_file.write(audio_bytes) | |
| audio_path_for_export = tmp_audio_file.name | |
| embedder = MP3Embedder() | |
| word_timestamps = st.session_state.transcription_data['word_timestamps'] | |
| output_filename = f"synced_{Path(st.session_state.transcription_data.get('original_filename', 'audio')).stem}.mp3" | |
| output_path, log_messages = embedder.embed_sylt_lyrics( | |
| audio_path_for_export, word_timestamps, | |
| st.session_state.edited_text, output_filename | |
| ) | |
| log_to_browser_console(log_messages) | |
| st.subheader("β Export Complete") | |
| if os.path.exists(output_path): | |
| with open(output_path, 'rb') as f: | |
| audio_bytes_to_download = f.read() | |
| st.audio(audio_bytes_to_download, format='audio/mp3') | |
| verification = embedder.verify_sylt_embedding(output_path) | |
| if verification.get('has_sylt'): | |
| st.success(f"Successfully embedded {verification.get('sylt_entries', 0)} words!") | |
| else: | |
| st.warning("Warning: Could not verify SYLT embedding.") | |
| st.download_button("Download Synced MP3", audio_bytes_to_download, | |
| output_filename, "audio/mpeg", use_container_width=True) | |
| else: | |
| st.error("Failed to create the final MP3 file.") | |
| except Exception as e: | |
| st.error(f"An unexpected error occurred during MP3 export: {e}") | |
| log_to_browser_console(f"--- FATAL ERROR in export_mp3: {traceback.format_exc()} ---") | |
| finally: | |
| if audio_path_for_export and os.path.exists(audio_path_for_export): | |
| os.unlink(audio_path_for_export) | |
| # --- Placeholder and Utility Functions --- | |
| def export_mp4(): | |
| with st.spinner("β³ Preparing video export..."): | |
| time.sleep(1) # Simulate work to provide feedback | |
| st.info("MP4 export functionality is not yet implemented.") | |
| def reset_session(): | |
| """Resets the session state by clearing specific keys and re-initializing.""" | |
| log_to_browser_console("--- INFO: Resetting session state. ---") | |
| keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording'] | |
| for key in keys_to_clear: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| initialize_session_state() | |
| # --- Entry Point --- | |
| if __name__ == "__main__": | |
| if check_api_key(): | |
| initialize_session_state() | |
| main() | |