# app.py - Refactored to eliminate recorder_server.py dependency
import streamlit as st
import os
import tempfile
import json
from pathlib import Path
import time
import traceback
import streamlit.components.v1 as components
from st_audiorec import st_audiorec # Import the new recorder component
# --- Critical Imports and Initial Checks ---
AUDIO_PROCESSOR_CLASS = None
IMPORT_ERROR_TRACEBACK = None
try:
from audio_processor import AudioProcessor
AUDIO_PROCESSOR_CLASS = AudioProcessor
except Exception:
IMPORT_ERROR_TRACEBACK = traceback.format_exc()
from video_generator import VideoGenerator
from mp3_embedder import MP3Embedder
from utils import format_timestamp
from translator import get_translator, UI_TRANSLATIONS
from dotenv import load_dotenv
# --- API Key Check ---
def check_api_key():
"""Check for Gemini API key and display instructions if not found."""
load_dotenv()
if not os.getenv("GEMINI_API_KEY"):
st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!")
st.info("To fix this, please follow these steps:")
st.markdown("""
1. **Find the file named `.env.example`** in the `syncmaster2` directory.
2. **Rename it to `.env`**.
3. **Open the `.env` file** with a text editor.
4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey).
5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`.
6. **Save the file and restart the application.**
""")
return False
return True
# --- Page Configuration ---
st.set_page_config(
page_title="SyncMaster - AI Audio-Text Synchronization",
page_icon="🎵",
layout="wide"
)
# --- Browser Console Logging Utility ---
def log_to_browser_console(messages):
"""Injects JavaScript to log messages to the browser's console."""
if isinstance(messages, str):
messages = [messages]
escaped_messages = [json.dumps(str(msg)) for msg in messages]
js_code = f"""
"""
components.html(js_code, height=0, scrolling=False)
# --- Session State Initialization ---
def initialize_session_state():
"""Initializes the session state variables if they don't exist."""
if 'step' not in st.session_state:
st.session_state.step = 1
if 'audio_data' not in st.session_state:
st.session_state.audio_data = None
if 'language' not in st.session_state:
st.session_state.language = 'en'
if 'enable_translation' not in st.session_state:
st.session_state.enable_translation = True
if 'target_language' not in st.session_state:
st.session_state.target_language = 'ar'
if 'transcription_data' not in st.session_state:
st.session_state.transcription_data = None
if 'edited_text' not in st.session_state:
st.session_state.edited_text = ""
if 'video_style' not in st.session_state:
st.session_state.video_style = {
'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF',
'highlight_color': '#FFD700', 'background_color': '#000000',
'font_family': 'Arial', 'font_size': 48
}
if 'new_recording' not in st.session_state:
st.session_state.new_recording = None
# --- Centralized Audio Processing Function ---
def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"):
"""
A single, robust function to handle all audio processing.
Takes audio bytes as input and returns the processed data.
"""
if not audio_bytes:
st.error("No audio data provided to process.")
return
tmp_file_path = None
log_to_browser_console("--- INFO: Starting unified audio processing. ---")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file:
tmp_file.write(audio_bytes)
tmp_file_path = tmp_file.name
processor = AUDIO_PROCESSOR_CLASS()
result_data = None
full_text = ""
word_timestamps = []
# Determine which processing path to take
if st.session_state.enable_translation:
with st.spinner("⏳ Performing AI Transcription & Translation... please wait."):
result_data, processor_logs = processor.get_word_timestamps_with_translation(
tmp_file_path,
st.session_state.target_language
)
log_to_browser_console(processor_logs)
if not result_data or not result_data.get('original_text'):
st.warning("Could not generate transcription with translation. Check browser console (F12) for logs.")
return
st.session_state.transcription_data = {
'text': result_data['original_text'],
'translated_text': result_data['translated_text'],
'word_timestamps': result_data['word_timestamps'],
'audio_bytes': audio_bytes,
'original_suffix': Path(original_filename).suffix,
'translation_success': result_data.get('translation_success', False),
'detected_language': result_data.get('language_detected', 'unknown')
}
st.session_state.edited_text = result_data['original_text']
else: # Standard processing without translation
with st.spinner("⏳ Performing AI Transcription... please wait."):
word_timestamps, processor_logs = processor.get_word_timestamps(tmp_file_path)
log_to_browser_console(processor_logs)
if not word_timestamps:
st.warning("Could not generate timestamps. Check browser console (F12) for logs.")
return
full_text = " ".join([d['word'] for d in word_timestamps])
st.session_state.transcription_data = {
'text': full_text,
'word_timestamps': word_timestamps,
'audio_bytes': audio_bytes,
'original_suffix': Path(original_filename).suffix,
'translation_success': False
}
st.session_state.edited_text = full_text
st.session_state.step = 2
st.success("🎉 AI processing complete! Please review the results.")
except Exception as e:
st.error("An unexpected error occurred during audio processing!")
st.exception(e)
log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---")
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
time.sleep(1)
st.rerun()
# --- Main Application Logic ---
def main():
initialize_session_state()
st.markdown("""
""", unsafe_allow_html=True)
with st.sidebar:
st.markdown("## 🌐 Language Settings")
language_options = {'English': 'en', 'العربية': 'ar'}
selected_lang_display = st.selectbox(
"Interface Language",
options=list(language_options.keys()),
index=0 if st.session_state.language == 'en' else 1
)
st.session_state.language = language_options[selected_lang_display]
st.markdown("## 🔤 Translation Settings")
st.session_state.enable_translation = st.checkbox(
"Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي",
value=st.session_state.enable_translation,
help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً"
)
if st.session_state.enable_translation:
target_lang_options = {
'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es'
}
selected_target = st.selectbox(
"Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة",
options=list(target_lang_options.keys()), index=0
)
st.session_state.target_language = target_lang_options[selected_target]
st.title("🎵 SyncMaster")
if st.session_state.language == 'ar':
st.markdown("### منصة المزامنة الذكية بين الصوت والنص")
else:
st.markdown("### The Intelligent Audio-Text Synchronization Platform")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown(f"**{'✅' if st.session_state.step >= 1 else '1️⃣'} Step 1: Upload & Process**")
with col2:
st.markdown(f"**{'✅' if st.session_state.step >= 2 else '2️⃣'} Step 2: Review & Customize**")
with col3:
st.markdown(f"**{'✅' if st.session_state.step >= 3 else '3️⃣'} Step 3: Export**")
st.divider()
if AUDIO_PROCESSOR_CLASS is None:
st.error("Fatal Error: The application could not start correctly.")
st.subheader("An error occurred while trying to import `AudioProcessor`:")
st.code(IMPORT_ERROR_TRACEBACK, language="python")
st.stop()
if st.session_state.step == 1:
step_1_upload_and_process()
elif st.session_state.step == 2:
step_2_review_and_customize()
elif st.session_state.step == 3:
step_3_export()
# --- Step 1: Upload and Process ---
def step_1_upload_and_process():
st.header("Step 1: Choose Your Audio Source")
upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"])
with upload_tab:
st.subheader("Upload an existing audio file")
uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A")
if uploaded_file:
st.session_state.audio_data = uploaded_file.getvalue()
st.success(f"File ready for processing: {uploaded_file.name}")
st.audio(st.session_state.audio_data)
if st.button("🚀 Start AI Processing", type="primary", use_container_width=True):
run_audio_processing(st.session_state.audio_data, uploaded_file.name)
if st.session_state.audio_data:
if st.button("🔄 Use a Different File"):
reset_session()
st.rerun()
with record_tab:
st.subheader("Record audio directly from your microphone")
st.info("Click the microphone icon to start recording. Click the stop icon when you are finished. Processing will begin automatically.")
# Use the audio recorder component
wav_audio_data = st_audiorec()
# If a new recording is made, process it automatically
if wav_audio_data is not None and st.session_state.new_recording != wav_audio_data:
st.session_state.new_recording = wav_audio_data
# Immediately process the new recording
run_audio_processing(st.session_state.new_recording, "recorded_audio.wav")
# --- Step 2: Review and Customize ---
def step_2_review_and_customize():
st.header("Step 2: Review & Customize")
if 'transcription_data' not in st.session_state or not st.session_state.transcription_data:
st.error("No data found. Please return to Step 1.")
if st.button("← Back to Step 1"):
st.session_state.step = 1; st.rerun()
return
# Display translation results if available
if st.session_state.transcription_data.get('translation_success', False):
st.success(f"🌐 Translation completed! Detected language: {st.session_state.transcription_data.get('detected_language', 'N/A')}")
col1, col2 = st.columns(2)
with col1:
st.subheader("Original Text")
st.text_area("Original Transcription", value=st.session_state.transcription_data['text'], height=150, disabled=True)
with col2:
st.subheader(f"Translation ({st.session_state.target_language.upper()})")
st.text_area("Translated Text", value=st.session_state.transcription_data['translated_text'], height=150, disabled=True)
col1, col2 = st.columns([3, 2])
with col1:
st.subheader("📝 Text Editor")
st.info("Edit the original transcribed text below. This text will be used for the final export.")
edited_text = st.text_area("Transcribed Text", value=st.session_state.edited_text, height=300)
st.session_state.edited_text = edited_text
with col2:
st.subheader("🎨 Video Style Customization")
st.session_state.video_style['animation_style'] = st.selectbox("Animation Style", ["Karaoke Style", "Pop-up Word"])
st.session_state.video_style['text_color'] = st.color_picker("Text Color", st.session_state.video_style['text_color'])
st.session_state.video_style['highlight_color'] = st.color_picker("Highlight Color", st.session_state.video_style['highlight_color'])
col1_nav, _, col3_nav = st.columns([1, 2, 1])
with col1_nav:
if st.button("← Back to Upload"):
st.session_state.step = 1; st.rerun()
with col3_nav:
if st.button("Continue to Export →", type="primary"):
st.session_state.step = 3; st.rerun()
# --- Step 3: Export ---
def step_3_export():
st.header("Step 3: Export Your Synchronized Media")
if 'transcription_data' not in st.session_state or not st.session_state.transcription_data:
st.error("No data found. Please return to Step 1.")
if st.button("← Back to Step 1"):
st.session_state.step = 1; st.rerun()
return
col1, col2 = st.columns(2)
with col1:
st.subheader("🎵 MP3 Export")
if st.button("📱 Export MP3 with Lyrics", type="primary", use_container_width=True):
export_mp3()
with col2:
st.subheader("🎬 MP4 Video Export")
if st.button("🎥 Generate Video Summary", type="primary", use_container_width=True):
export_mp4()
st.divider()
col1_nav, col2_nav, _ = st.columns([1, 1, 3])
with col1_nav:
if st.button("← Back to Customize"):
st.session_state.step = 2; st.rerun()
with col2_nav:
if st.button("🔄 Start Over"):
reset_session(); st.rerun()
# --- MP3 Export Function ---
def export_mp3():
audio_path_for_export = None
log_to_browser_console("--- INFO: Starting MP3 export process. ---")
try:
with st.spinner("⏳ Exporting MP3... Please wait, this may take a moment."):
suffix = st.session_state.transcription_data['original_suffix']
audio_bytes = st.session_state.transcription_data['audio_bytes']
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_audio_file:
tmp_audio_file.write(audio_bytes)
audio_path_for_export = tmp_audio_file.name
embedder = MP3Embedder()
word_timestamps = st.session_state.transcription_data['word_timestamps']
output_filename = f"synced_{Path(st.session_state.transcription_data.get('original_filename', 'audio')).stem}.mp3"
output_path, log_messages = embedder.embed_sylt_lyrics(
audio_path_for_export, word_timestamps,
st.session_state.edited_text, output_filename
)
log_to_browser_console(log_messages)
st.subheader("✅ Export Complete")
if os.path.exists(output_path):
with open(output_path, 'rb') as f:
audio_bytes_to_download = f.read()
st.audio(audio_bytes_to_download, format='audio/mp3')
verification = embedder.verify_sylt_embedding(output_path)
if verification.get('has_sylt'):
st.success(f"Successfully embedded {verification.get('sylt_entries', 0)} words!")
else:
st.warning("Warning: Could not verify SYLT embedding.")
st.download_button("Download Synced MP3", audio_bytes_to_download,
output_filename, "audio/mpeg", use_container_width=True)
else:
st.error("Failed to create the final MP3 file.")
except Exception as e:
st.error(f"An unexpected error occurred during MP3 export: {e}")
log_to_browser_console(f"--- FATAL ERROR in export_mp3: {traceback.format_exc()} ---")
finally:
if audio_path_for_export and os.path.exists(audio_path_for_export):
os.unlink(audio_path_for_export)
# --- Placeholder and Utility Functions ---
def export_mp4():
with st.spinner("⏳ Preparing video export..."):
time.sleep(1) # Simulate work to provide feedback
st.info("MP4 export functionality is not yet implemented.")
def reset_session():
"""Resets the session state by clearing specific keys and re-initializing."""
log_to_browser_console("--- INFO: Resetting session state. ---")
keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording']
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
initialize_session_state()
# --- Entry Point ---
if __name__ == "__main__":
if check_api_key():
initialize_session_state()
main()