# app.py - Refactored to eliminate recorder_server.py dependency
import streamlit as st
import os
import tempfile
import json
from pathlib import Path
import time
import traceback
import streamlit.components.v1 as components
import hashlib
# from st_audiorec import st_audiorec # Import the new recorder component - OLD
# Reduce metrics/usage writes that can cause permission errors on hosted environments
try:
st.set_option('browser.gatherUsageStats', False)
except Exception:
pass
# Robust component declaration: prefer local build, else fall back to pip package
parent_dir = os.path.dirname(os.path.abspath(__file__))
build_dir = os.path.join(parent_dir, "custom_components/st-audiorec/st_audiorec/frontend/build")
def st_audiorec(key=None):
"""Return audio recorder component value, trying local build first, then pip package fallback."""
try:
if os.path.isdir(build_dir):
_component_func = components.declare_component("st_audiorec", path=build_dir)
return _component_func(key=key, default=0)
# Fallback to pip-installed component if available
try:
from st_audiorec import st_audiorec as st_audiorec_pkg
return st_audiorec_pkg(key=key)
except Exception:
st.warning("Audio recorder component is unavailable on this deployment (missing local build and pip fallback).")
return None
except Exception:
# Final safety net
st.warning("Failed to initialize audio recorder component.")
return None
# --- Critical Imports and Initial Checks ---
AUDIO_PROCESSOR_CLASS = None
IMPORT_ERROR_TRACEBACK = None
try:
from audio_processor import AudioProcessor
AUDIO_PROCESSOR_CLASS = AudioProcessor
except Exception:
IMPORT_ERROR_TRACEBACK = traceback.format_exc()
from video_generator import VideoGenerator
from mp3_embedder import MP3Embedder
from utils import format_timestamp
from translator import get_translator, UI_TRANSLATIONS
import requests
from dotenv import load_dotenv
# --- API Key Check ---
def check_api_key():
"""Check for Gemini API key and display instructions if not found."""
load_dotenv()
if not os.getenv("GEMINI_API_KEY"):
st.error("🔴 FATAL ERROR: GEMINI_API_KEY is not set!")
st.info("To fix this, please follow these steps:")
st.markdown("""
1. **Find the file named `.env.example`** in the `syncmaster2` directory.
2. **Rename it to `.env`**.
3. **Open the `.env` file** with a text editor.
4. **Get your free API key** from [Google AI Studio](https://aistudio.google.com/app/apikey).
5. **Paste your key** into the file, replacing `"PASTE_YOUR_GEMINI_API_KEY_HERE"`.
6. **Save the file and restart the application.**
""")
return False
return True
# --- Page Configuration ---
st.set_page_config(
page_title="SyncMaster - AI Audio-Text Synchronization",
page_icon="🎵",
layout="wide"
)
# --- Browser Console Logging Utility ---
def log_to_browser_console(messages):
"""Injects JavaScript to log messages to the browser's console."""
if isinstance(messages, str):
messages = [messages]
escaped_messages = [json.dumps(str(msg)) for msg in messages]
js_code = f"""
"""
components.html(js_code, height=0, scrolling=False)
# --- Session State Initialization ---
def initialize_session_state():
"""Initializes the session state variables if they don't exist."""
if 'step' not in st.session_state:
st.session_state.step = 1
if 'audio_data' not in st.session_state:
st.session_state.audio_data = None
if 'language' not in st.session_state:
st.session_state.language = 'en'
if 'enable_translation' not in st.session_state:
st.session_state.enable_translation = True
if 'target_language' not in st.session_state:
st.session_state.target_language = 'ar'
if 'transcription_data' not in st.session_state:
st.session_state.transcription_data = None
if 'edited_text' not in st.session_state:
st.session_state.edited_text = ""
if 'video_style' not in st.session_state:
st.session_state.video_style = {
'animation_style': 'Karaoke Style', 'text_color': '#FFFFFF',
'highlight_color': '#FFD700', 'background_color': '#000000',
'font_family': 'Arial', 'font_size': 48
}
if 'new_recording' not in st.session_state:
st.session_state.new_recording = None
# Transcript feed (prepend latest) and dedupe set
if 'transcript_feed' not in st.session_state:
st.session_state.transcript_feed = [] # list of {id, ts, text}
if 'transcript_ids' not in st.session_state:
st.session_state.transcript_ids = set()
# Incremental broadcast state
if 'broadcast_segments' not in st.session_state:
st.session_state.broadcast_segments = [] # [{id, recording_id, start_ms, end_ms, checksum, text}]
if 'lastFetchedEnd_ms' not in st.session_state:
st.session_state.lastFetchedEnd_ms = 0
# Broadcast translation language (separate from general UI translation target)
if 'broadcast_translation_lang' not in st.session_state:
st.session_state.broadcast_translation_lang = 'ar'
# --- Centralized Audio Processing Function ---
def run_audio_processing(audio_bytes, original_filename="recorded_audio.wav"):
"""
A single, robust function to handle all audio processing.
Takes audio bytes as input and returns the processed data.
"""
if not audio_bytes:
st.error("No audio data provided to process.")
return
tmp_file_path = None
log_to_browser_console("--- INFO: Starting unified audio processing. ---")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(original_filename).suffix) as tmp_file:
tmp_file.write(audio_bytes)
tmp_file_path = tmp_file.name
processor = AUDIO_PROCESSOR_CLASS()
result_data = None
full_text = ""
word_timestamps = []
# Determine which processing path to take
if st.session_state.enable_translation:
with st.spinner("⏳ Performing AI Transcription & Translation... please wait."):
result_data, processor_logs = processor.get_word_timestamps_with_translation(
tmp_file_path,
st.session_state.target_language,
)
log_to_browser_console(processor_logs)
if not result_data or not result_data.get("original_text"):
st.warning(
"Could not generate transcription with translation. Check browser console (F12) for logs."
)
return
st.session_state.transcription_data = {
"text": result_data["original_text"],
"translated_text": result_data["translated_text"],
"word_timestamps": result_data["word_timestamps"],
"audio_bytes": audio_bytes,
"original_suffix": Path(original_filename).suffix,
"translation_success": result_data.get("translation_success", False),
"detected_language": result_data.get("language_detected", "unknown"),
}
# Update transcript feed (prepend, dedupe by digest)
try:
digest = hashlib.md5(audio_bytes).hexdigest()
except Exception:
digest = f"snap-{int(time.time()*1000)}"
if digest not in st.session_state.transcript_ids:
st.session_state.transcript_ids.add(digest)
st.session_state.transcript_feed.insert(
0,
{
"id": digest,
"ts": int(time.time() * 1000),
"text": result_data["original_text"],
},
)
# Rebuild edited_text with newest first
st.session_state.edited_text = "\n\n".join(
[s["text"] for s in st.session_state.transcript_feed]
)
else: # Standard processing without translation
with st.spinner("⏳ Performing AI Transcription... please wait."):
word_timestamps, processor_logs = processor.get_word_timestamps(
tmp_file_path
)
log_to_browser_console(processor_logs)
if not word_timestamps:
st.warning(
"Could not generate timestamps. Check browser console (F12) for logs."
)
return
full_text = " ".join([d["word"] for d in word_timestamps])
st.session_state.transcription_data = {
"text": full_text,
"word_timestamps": word_timestamps,
"audio_bytes": audio_bytes,
"original_suffix": Path(original_filename).suffix,
"translation_success": False,
}
# Update transcript feed (prepend, dedupe by digest)
try:
digest = hashlib.md5(audio_bytes).hexdigest()
except Exception:
digest = f"snap-{int(time.time()*1000)}"
if digest not in st.session_state.transcript_ids:
st.session_state.transcript_ids.add(digest)
st.session_state.transcript_feed.insert(
0, {"id": digest, "ts": int(time.time() * 1000), "text": full_text}
)
# Rebuild edited_text with newest first
st.session_state.edited_text = "\n\n".join(
[s["text"] for s in st.session_state.transcript_feed]
)
st.session_state.step = 1 # Keep it on the same step
st.success("🎉 AI processing complete! Results are shown below.")
except Exception as e:
st.error("An unexpected error occurred during audio processing!")
st.exception(e)
log_to_browser_console(f"--- FATAL ERROR in run_audio_processing: {traceback.format_exc()} ---")
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
time.sleep(1)
st.rerun()
# --- Main Application Logic ---
def main():
initialize_session_state()
st.markdown("""
""", unsafe_allow_html=True)
with st.sidebar:
st.markdown("## 🌐 Language Settings")
language_options = {'English': 'en', 'العربية': 'ar'}
selected_lang_display = st.selectbox(
"Interface Language",
options=list(language_options.keys()),
index=0 if st.session_state.language == 'en' else 1
)
st.session_state.language = language_options[selected_lang_display]
st.markdown("## 🔤 Translation Settings")
st.session_state.enable_translation = st.checkbox(
"Enable AI Translation" if st.session_state.language == 'en' else "تفعيل الترجمة بالذكاء الاصطناعي",
value=st.session_state.enable_translation,
help="Automatically translate transcribed text" if st.session_state.language == 'en' else "ترجمة النص تلقائياً"
)
if st.session_state.enable_translation:
target_lang_options = {
'Arabic (العربية)': 'ar', 'English': 'en', 'French (Français)': 'fr', 'Spanish (Español)': 'es'
}
selected_target = st.selectbox(
"Target Language" if st.session_state.language == 'en' else "اللغة المستهدفة",
options=list(target_lang_options.keys()), index=0
)
st.session_state.target_language = target_lang_options[selected_target]
st.title("🎵 SyncMaster")
if st.session_state.language == 'ar':
st.markdown("### منصة المزامنة الذكية بين الصوت والنص")
else:
st.markdown("### The Intelligent Audio-Text Synchronization Platform")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown(f"**{'✅' if st.session_state.step >= 1 else '1️⃣'} Step 1: Upload & Process**")
with col2:
st.markdown(f"**{'✅' if st.session_state.step >= 2 else '2️⃣'} Step 2: Review & Customize**")
with col3:
st.markdown(f"**{'✅' if st.session_state.step >= 3 else '3️⃣'} Step 3: Export**")
st.divider()
# Global settings for long recording retention and custom snapshot duration
with st.expander("⚙️ Recording Settings (Snapshots)", expanded=False):
st.session_state.setdefault('retention_minutes', 30)
# 0 means: use full buffer by default for Custom
st.session_state.setdefault('custom_snapshot_seconds', 0)
st.session_state.retention_minutes = st.number_input("Retention window (minutes)", min_value=5, max_value=240, value=st.session_state.retention_minutes)
st.session_state.custom_snapshot_seconds = st.number_input("Custom snapshot (seconds; 0 = full buffer)", min_value=0, max_value=3600, value=st.session_state.custom_snapshot_seconds)
# Inject globals into the page for the component to pick up
components.html(f"""
""", height=0)
if AUDIO_PROCESSOR_CLASS is None:
st.error("Fatal Error: The application could not start correctly.")
st.subheader("An error occurred while trying to import `AudioProcessor`:")
st.code(IMPORT_ERROR_TRACEBACK, language="python")
st.stop()
step_1_upload_and_process()
# Always show results if they exist, regardless of step
if st.session_state.transcription_data:
step_2_review_and_customize()
step_3_export()
# --- Step 1: Upload and Process ---
def step_1_upload_and_process():
st.header("Step 1: Choose Your Audio Source")
upload_tab, record_tab = st.tabs(["📤 Upload a File", "🎙️ Record Audio"])
with upload_tab:
st.subheader("Upload an existing audio file")
uploaded_file = st.file_uploader("Choose an audio file", type=['mp3', 'wav', 'm4a'], help="Supported formats: MP3, WAV, M4A")
if uploaded_file:
st.session_state.audio_data = uploaded_file.getvalue()
st.success(f"File ready for processing: {uploaded_file.name}")
st.audio(st.session_state.audio_data)
if st.button("🚀 Start AI Processing", type="primary", use_container_width=True):
run_audio_processing(st.session_state.audio_data, uploaded_file.name)
if st.session_state.audio_data:
if st.button("🔄 Use a Different File"):
reset_session()
st.rerun()
with record_tab:
st.subheader("Record audio directly from your microphone")
st.info("Click the microphone icon to start recording. Use the ⏪ buttons to snapshot the last seconds without stopping. Processing can run automatically.")
# Use the audio recorder component
wav_audio_data = st_audiorec()
# Auto-process incoming snapshots using the existing flow (no external server)
st.session_state.setdefault('auto_process_snapshots', True)
st.checkbox("Auto-process snapshots (keeps recording)", key='auto_process_snapshots', help="When enabled, any snapshot from the recorder is processed immediately using the classic transcription method.")
if wav_audio_data:
# Two possible payload shapes: raw bytes array (legacy) or interval payload dict
if isinstance(wav_audio_data, dict) and wav_audio_data.get('type') in ('interval_wav', 'no_new'):
payload = wav_audio_data
if payload['type'] == 'no_new':
st.info("لا توجد أجزاء جديدة.")
elif payload['type'] == 'interval_wav':
# Extract interval audio
b = bytes(payload['bytes'])
sr = int(payload.get('sr', 16000))
start_ms = int(payload['start_ms'])
end_ms = int(payload['end_ms'])
# Dedupe/trim logic
if end_ms <= start_ms:
st.warning("الجزء المُرسل فارغ.")
else:
# Prevent overlap with prior segment
last_end = st.session_state.lastFetchedEnd_ms or 0
eff_start_ms = max(start_ms, last_end)
if eff_start_ms < end_ms:
# If there is overlap, trim the audio bytes accordingly (assumes WAV PCM16 mono header 44 bytes)
try:
delta_ms = eff_start_ms - start_ms
if delta_ms > 0:
if len(b) >= 44 and b[0:4] == b'RIFF' and b[8:12] == b'WAVE':
bytes_per_sample = 2 # PCM16 mono
drop_samples = int(sr * (delta_ms / 1000.0))
drop_bytes = drop_samples * bytes_per_sample
data_size = int.from_bytes(b[40:44], 'little') if len(b) >= 44 else len(b) - 44
pcm = b[44:]
if drop_bytes < len(pcm):
pcm_trim = pcm[drop_bytes:]
else:
pcm_trim = b''
new_data_size = len(pcm_trim)
# Rebuild header sizes
header = bytearray(b[:44])
# ChunkSize at offset 4 = 36 + Subchunk2Size
(36 + new_data_size).to_bytes(4, 'little')
header[4:8] = (36 + new_data_size).to_bytes(4, 'little')
# Subchunk2Size at offset 40
header[40:44] = new_data_size.to_bytes(4, 'little')
b = bytes(header) + pcm_trim
else:
# Not a recognizable WAV header; keep as-is
pass
except Exception as _:
pass
# Compute checksum
digest = hashlib.md5(b).hexdigest()
# Skip if identical checksum and same window
exists = any(s.get('checksum') == digest and s.get('start_ms') == eff_start_ms and s.get('end_ms') == end_ms for s in st.session_state.broadcast_segments)
if not exists:
# Run standard pipeline to get text (no translation to keep it light)
# Reuse run_audio_processing internals via a temp path
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tf:
tf.write(b)
tmp_path = tf.name
try:
processor = AUDIO_PROCESSOR_CLASS()
word_timestamps, processor_logs = processor.get_word_timestamps(tmp_path)
full_text = " ".join([d['word'] for d in word_timestamps]) if word_timestamps else ""
finally:
if os.path.exists(tmp_path): os.unlink(tmp_path)
# Auto-translate to selected language if enabled
translations = {}
try:
if full_text and st.session_state.get('enable_translation', True):
translator = get_translator()
sel_lang = st.session_state.get('broadcast_translation_lang', 'ar')
tx, _ = translator.translate_text(full_text, target_language=sel_lang)
if tx:
translations[sel_lang] = tx
except Exception:
pass
# Append segment
seg = {
'id': digest,
'recording_id': payload.get('session_id', 'local'),
'start_ms': eff_start_ms,
'end_ms': end_ms,
'checksum': digest,
'text': full_text,
'translations': translations,
}
st.session_state.broadcast_segments.append(seg)
# Keep sorted by time
st.session_state.broadcast_segments.sort(key=lambda s: s['start_ms'])
# Update lastFetchedEnd
st.session_state.lastFetchedEnd_ms = end_ms
# Also update transcript feed (newest first) and edited text so user sees output immediately
if full_text:
if digest not in st.session_state.transcript_ids:
st.session_state.transcript_ids.add(digest)
st.session_state.transcript_feed.insert(
0,
{
"id": digest,
"ts": int(time.time() * 1000),
"text": full_text,
},
)
st.session_state.edited_text = "\n\n".join(
[s["text"] for s in st.session_state.transcript_feed]
)
st.success(f"تم إضافة جزء جديد: {eff_start_ms/1000:.2f}s → {end_ms/1000:.2f}s")
else:
st.info("تم تجاهل جزء مكرر.")
else:
st.info("لا توجد أجزاء جديدة بعد آخر نقطة.")
else:
# Legacy: treat as full wav bytes
bytes_data = bytes(wav_audio_data)
st.session_state.audio_data = bytes_data
st.audio(bytes_data)
digest = hashlib.md5(bytes_data).hexdigest()
last_digest = st.session_state.get('_last_component_digest')
if st.session_state.auto_process_snapshots and digest != last_digest:
st.session_state['_last_component_digest'] = digest
run_audio_processing(bytes_data, "snapshot.wav")
else:
if st.button("📝 Extract Text", type="primary", use_container_width=True):
st.session_state['_last_component_digest'] = digest
run_audio_processing(bytes_data, "recorded_audio.wav")
# Simplified: removed external live slice server UI to avoid complexity
# Always show Broadcast view in Step 1 as well (regardless of transcription_data)
with st.expander("📻 Broadcast (chronological)", expanded=True):
# Language selector for broadcast translations
try:
translator = get_translator()
langs = translator.get_supported_languages()
codes = list(langs.keys())
labels = [f"{code} — {langs[code]}" for code in codes]
current = st.session_state.get('broadcast_translation_lang', 'ar')
default_index = codes.index(current) if current in codes else 0
sel_label = st.selectbox("Broadcast translation language", labels, index=default_index)
sel_code = sel_label.split(' — ')[0]
st.session_state.broadcast_translation_lang = sel_code
except Exception:
sel_code = st.session_state.get('broadcast_translation_lang', 'ar')
if st.session_state.broadcast_segments:
for s in st.session_state.broadcast_segments:
st.markdown(f"**[{s['start_ms']/1000:.2f}s → {s['end_ms']/1000:.2f}s]**")
st.write(s.get('text', ''))
# Ensure and show translation in selected language
if s.get('text') and st.session_state.get('enable_translation', True):
if 'translations' not in s or not isinstance(s.get('translations'), dict):
s['translations'] = {}
if sel_code not in s['translations']:
try:
tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code)
if tx:
s['translations'][sel_code] = tx
except Exception:
pass
if s['translations'].get(sel_code):
st.caption(f"الترجمة ({sel_code}):")
st.write(s['translations'][sel_code])
st.divider()
else:
st.caption("لا توجد أجزاء بعد. استخدم زر Custom أثناء التسجيل.")
# Note: external live slice helper removed to keep the app simple and fully local
# --- Step 2: Review and Customize ---
def step_2_review_and_customize():
st.header("✅ Extracted Text & Translation")
# Display translation results if available
if st.session_state.transcription_data.get('translation_success', False):
st.success(f"🌐 Translation completed! Detected language: {st.session_state.transcription_data.get('detected_language', 'N/A')}")
col1, col2 = st.columns(2)
with col1:
st.subheader("Original Text")
st.text_area("Original Transcription", value=st.session_state.transcription_data['text'], height=150, key="original_text_area")
st.button("📋 Copy Original Text", on_click=lambda: st.toast("Copied to clipboard!"), args=(), kwargs={'clipboard': st.session_state.transcription_data['text']})
with col2:
st.subheader(f"Translation ({st.session_state.target_language.upper()})")
st.text_area("Translated Text", value=st.session_state.transcription_data['translated_text'], height=150, key="translated_text_area")
st.button("📋 Copy Translated Text", on_click=lambda: st.toast("Copied to clipboard!"), args=(), kwargs={'clipboard': st.session_state.transcription_data['translated_text']})
# Broadcast view (chronological)
with st.expander("📻 Broadcast (chronological)", expanded=True):
# Use the same selected language from Step 1 (do not duplicate selector to avoid state conflicts)
sel_code = st.session_state.get('broadcast_translation_lang', 'ar')
if st.session_state.broadcast_segments:
for s in st.session_state.broadcast_segments:
st.markdown(f"**[{s['start_ms']/1000:.2f}s → {s['end_ms']/1000:.2f}s]**")
st.write(s.get('text', ''))
# Ensure and show translation in selected language
if s.get('text') and st.session_state.get('enable_translation', True):
if 'translations' not in s or not isinstance(s.get('translations'), dict):
s['translations'] = {}
if sel_code not in s['translations']:
try:
tx, _ = get_translator().translate_text(s.get('text', ''), target_language=sel_code)
if tx:
s['translations'][sel_code] = tx
except Exception:
pass
if s['translations'].get(sel_code):
st.caption(f"الترجمة ({sel_code}):")
st.write(s['translations'][sel_code])
st.divider()
else:
st.caption("لا توجد أجزاء بعد. استخدم زر Custom أثناء التسجيل.")
# Show transcript feed (newest first) with simple dedupe behavior
with st.expander("🧾 Transcript Feed (latest first)", expanded=True):
if st.session_state.transcript_feed:
for i, s in enumerate(st.session_state.transcript_feed):
st.markdown(f"**Snippet {i+1}** — id: `{s['id']}`")
st.write(s['text'])
st.divider()
else:
st.caption("No transcript snippets yet. Take a snapshot to populate this feed.")
col1, col2 = st.columns([3, 2])
with col1:
st.subheader("📝 Text Editor")
st.info("Edit the original transcribed text below. This text will be used for the final export.")
edited_text = st.text_area("Transcribed Text", value=st.session_state.edited_text, height=300)
st.session_state.edited_text = edited_text
with col2:
st.subheader("🎨 Video Style Customization")
st.session_state.video_style['animation_style'] = st.selectbox("Animation Style", ["Karaoke Style", "Pop-up Word"])
st.session_state.video_style['text_color'] = st.color_picker("Text Color", st.session_state.video_style['text_color'])
st.session_state.video_style['highlight_color'] = st.color_picker("Highlight Color", st.session_state.video_style['highlight_color'])
# Remove navigation buttons
st.divider()
st.subheader("🧠 شرح تعليمي بالعربية (غير ترجمة حرفية)")
st.info("هذا القسم يقدّم شرحًا تفصيليًا ومنظمًا باللغة العربية للمحتوى، مع أمثلة ونقاط مُلخّصة، ليس ترجمة حرفية.")
# Determine source for explanation: prefer Arabic translation if available, else original
source_text = None
td = st.session_state.transcription_data
if td.get('translation_success') and td.get('translated_text'):
source_text = td['translated_text']
else:
source_text = td.get('text', '')
if 'arabic_explanation' not in st.session_state:
st.session_state.arabic_explanation = None
colE, colF = st.columns([1, 4])
with colE:
if st.button("✍️ توليد الشرح بالعربية", use_container_width=True):
translator = get_translator()
with st.spinner("⏳ جاري توليد الشرح التفصيلي بالعربية..."):
explained, err = translator.explain_text_arabic(source_text or '')
if explained:
st.session_state.arabic_explanation = explained
st.success("تم إنشاء الشرح بنجاح.")
else:
st.error(err or "تعذّر إنشاء الشرح. حاول مجددًا.")
with colF:
st.text_area("الشرح العربي التفصيلي", value=st.session_state.arabic_explanation or "", height=350)
# --- Step 3: Export ---
def step_3_export():
st.header("⬇️ Export Your Media")
col1, col2 = st.columns(2)
with col1:
st.subheader("🎵 MP3 Export")
if st.button("📱 Export MP3 with Lyrics", type="primary", use_container_width=True):
export_mp3()
with col2:
st.subheader("🎬 MP4 Video Export")
if st.button("🎥 Generate Video Summary", type="primary", use_container_width=True):
export_mp4()
st.divider()
# "Record Again" functionality is now handled by the component itself.
# --- MP3 Export Function ---
def export_mp3():
audio_path_for_export = None
log_to_browser_console("--- INFO: Starting MP3 export process. ---")
try:
with st.spinner("⏳ Exporting MP3... Please wait, this may take a moment."):
suffix = st.session_state.transcription_data['original_suffix']
audio_bytes = st.session_state.transcription_data['audio_bytes']
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_audio_file:
tmp_audio_file.write(audio_bytes)
audio_path_for_export = tmp_audio_file.name
embedder = MP3Embedder()
word_timestamps = st.session_state.transcription_data['word_timestamps']
output_filename = f"synced_{Path(st.session_state.transcription_data.get('original_filename', 'audio')).stem}.mp3"
output_path, log_messages = embedder.embed_sylt_lyrics(
audio_path_for_export, word_timestamps,
st.session_state.edited_text, output_filename
)
log_to_browser_console(log_messages)
st.subheader("✅ Export Complete")
if os.path.exists(output_path):
with open(output_path, 'rb') as f:
audio_bytes_to_download = f.read()
st.audio(audio_bytes_to_download, format='audio/mp3')
verification = embedder.verify_sylt_embedding(output_path)
if verification.get('has_sylt'):
st.success(f"Successfully embedded {verification.get('sylt_entries', 0)} words!")
else:
st.warning("Warning: Could not verify SYLT embedding.")
st.download_button("Download Synced MP3", audio_bytes_to_download,
output_filename, "audio/mpeg", use_container_width=True)
else:
st.error("Failed to create the final MP3 file.")
except Exception as e:
st.error(f"An unexpected error occurred during MP3 export: {e}")
log_to_browser_console(f"--- FATAL ERROR in export_mp3: {traceback.format_exc()} ---")
finally:
if audio_path_for_export and os.path.exists(audio_path_for_export):
os.unlink(audio_path_for_export)
# --- Placeholder and Utility Functions ---
def export_mp4():
with st.spinner("⏳ Preparing video export..."):
time.sleep(1) # Simulate work to provide feedback
st.info("MP4 export functionality is not yet implemented.")
def reset_session():
"""Resets the session state by clearing specific keys and re-initializing."""
log_to_browser_console("--- INFO: Resetting session state. ---")
keys_to_clear = ['step', 'audio_data', 'transcription_data', 'edited_text', 'video_style', 'new_recording']
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
initialize_session_state()
# --- Entry Point ---
if __name__ == "__main__":
if check_api_key():
initialize_session_state()
main()