"""Audio Labeling Tool — Streamlit Application Entry Point.""" import base64 import logging import os import streamlit as st from admin_panel import render_admin_panel from auth import authenticate, login, logout from audio_loader import copy_to_clean, load_audio_bytes from config import load_config from csv_persistence import save_label from models import LabelRecord from reference import load_reference from resume import build_file_list, compute_resume_index from skip_persistence import save_skip # --- Logging Setup --- LOG_DIR = os.environ.get("LOG_DIR", "/app/logs") os.makedirs(LOG_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler(os.path.join(LOG_DIR, "app.log")), logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) # --- Page Config --- st.set_page_config(page_title="Audio Labeling Tool", layout="wide") def init_session_state(): """Initialize session state defaults.""" if "authenticated" not in st.session_state: st.session_state["authenticated"] = False if "username" not in st.session_state: st.session_state["username"] = None if "role" not in st.session_state: st.session_state["role"] = None def render_login(): """Render the login view.""" st.title("Audio Labeling Tool") st.subheader("Login") username = st.text_input("Username", key="login_username") password = st.text_input("Password", type="password", key="login_password") if st.button("Login"): if not username or not password: st.error("Please enter both username and password.") return try: role = authenticate(username, password) if role: login(username, role) st.rerun() else: st.error("Invalid username or password.") except Exception: st.error("Invalid username or password.") def get_labeler_config(username: str) -> dict: """Get the configuration for the current labeler.""" config = load_config() return config["labelers"][username] def initialize_labeling_session(username: str): """Initialize the labeling session: build file list, load reference, compute resume.""" if "file_list" in st.session_state: return # Already initialized labeler_cfg = get_labeler_config(username) audio_folder = labeler_cfg["audio_folder"] reference_json = labeler_cfg["reference_json"] output_dir = labeler_cfg["output_dir"] csv_path = os.path.join(output_dir, f"{username}_metadata.csv") # Build file list file_list = build_file_list(audio_folder) st.session_state["file_list"] = file_list st.session_state["audio_folder"] = audio_folder st.session_state["csv_path"] = csv_path st.session_state["output_dir"] = output_dir st.session_state["clean_audios_dir"] = labeler_cfg["clean_audios_dir"] # Load reference JSON try: reference = load_reference(reference_json) st.session_state["reference"] = reference st.session_state["reference_error"] = None except (FileNotFoundError, ValueError) as e: st.session_state["reference"] = {} st.session_state["reference_error"] = str(e) # Compute resume index resume_index = compute_resume_index(file_list, csv_path, username) st.session_state["current_index"] = resume_index def render_audio_player(audio_bytes: bytes): """Render HTML5 audio player with speed control.""" audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") audio_html = f""" """ st.markdown(audio_html, unsafe_allow_html=True) def render_labeling_ui(): """Render the main labeling interface.""" username = st.session_state["username"] # Header with logout col_title, col_logout = st.columns([4, 1]) with col_title: st.title("Audio Labeling Tool") with col_logout: if st.button("Logout"): logout() st.rerun() # Initialize session initialize_labeling_session(username) # Check for reference loading error if st.session_state.get("reference_error"): st.error( "Reference file is corrupted or missing. Please contact admin." ) st.stop() file_list = st.session_state["file_list"] current_index = st.session_state["current_index"] audio_folder = st.session_state["audio_folder"] reference = st.session_state["reference"] # Handle completion if not file_list: st.warning("No audio files found in your assigned folder.") st.stop() if current_index >= len(file_list): st.success("All items have been labeled! You're done.") if st.button("← Go to last item"): st.session_state["current_index"] = len(file_list) - 1 st.rerun() st.stop() # Current file info current_filename = file_list[current_index] # Position indicator st.markdown(f"**{current_index + 1} / {len(file_list)}** — `{current_filename}`") # Audio player try: audio_bytes = load_audio_bytes(audio_folder, current_filename) render_audio_player(audio_bytes) except FileNotFoundError: st.error(f"Audio file not found: {current_filename}. Please contact admin.") # Speed control speed = st.select_slider( "Playback Speed", options=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], value=1.0, key="speed_slider", ) # Update playback rate via JS st.markdown( f"""""", unsafe_allow_html=True, ) st.divider() # Transcription default_transcription = reference.get(current_filename, "") if not default_transcription and current_filename not in reference: st.warning(f"No transcription found for: {current_filename}") transcription = st.text_area( "Transcription", value=default_transcription, height=100, key=f"transcription_{current_index}", ) # Metadata col_gender, col_pii = st.columns(2) with col_gender: gender = st.radio( "Gender", options=["male", "female"], key=f"gender_{current_index}", ) with col_pii: pii = st.checkbox("Contains PII", key=f"pii_{current_index}") st.divider() # Navigation and action buttons col_prev, col_next, col_apply, col_skip = st.columns(4) with col_prev: prev_disabled = current_index <= 0 if st.button("← Previous", disabled=prev_disabled): st.session_state["current_index"] = current_index - 1 st.rerun() with col_next: next_disabled = current_index >= len(file_list) - 1 if st.button("Next →", disabled=next_disabled): st.session_state["current_index"] = current_index + 1 st.rerun() with col_apply: if st.button("✓ Apply", type="primary"): # Build record record = LabelRecord( source=current_filename, transcription=transcription, gender=gender, pii=pii, labeler=username, ) csv_path = st.session_state["csv_path"] clean_audios_dir = st.session_state["clean_audios_dir"] try: # Save to CSV save_label(record, csv_path) # Copy audio to clean folder copy_to_clean(audio_folder, current_filename, clean_audios_dir) # Advance pointer only on success st.session_state["current_index"] = current_index + 1 st.rerun() except IOError as e: st.error(str(e)) # Pointer NOT advanced with col_skip: skip_disabled = current_index >= len(file_list) - 1 if st.button("Skip ✗", disabled=skip_disabled): st.session_state["show_skip_reason"] = True st.rerun() # Skip reason dialog if st.session_state.get("show_skip_reason", False): st.divider() st.markdown("**Why are you skipping this audio?**") config = load_config() skip_reasons = config.get("skip_reasons", ["Other"]) reason_choice = st.selectbox( "Select reason", options=skip_reasons, key=f"skip_reason_select_{current_index}", ) custom_reason = "" if reason_choice == "Other": custom_reason = st.text_input( "Please specify:", key=f"skip_custom_reason_{current_index}" ) col_confirm, col_cancel = st.columns(2) with col_confirm: if st.button("Confirm Skip"): final_reason = custom_reason if reason_choice == "Other" else reason_choice if reason_choice == "Other" and not custom_reason.strip(): st.error("Please provide a reason.") else: shared_output_dir = config["shared_output_dir"] skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv") try: save_skip(username, current_filename, final_reason, skip_csv_path) st.session_state["show_skip_reason"] = False st.session_state["current_index"] = current_index + 1 st.rerun() except IOError as e: st.error(str(e)) with col_cancel: if st.button("Cancel"): st.session_state["show_skip_reason"] = False st.rerun() def main(): """Main application entry point.""" init_session_state() if st.session_state["authenticated"]: role = st.session_state.get("role") if role == "admin": render_admin_panel() else: render_labeling_ui() else: render_login() if __name__ == "__main__": main()