Spaces:
Sleeping
Sleeping
| """Audio Labeling Tool — Streamlit Application Entry Point.""" | |
| import base64 | |
| import logging | |
| import os | |
| import streamlit as st | |
| from admin_panel import render_admin_panel | |
| from auth import authenticate, login, logout | |
| from audio_loader import copy_to_clean, load_audio_bytes | |
| from config import load_config | |
| from csv_persistence import save_label | |
| from models import LabelRecord | |
| from reference import load_reference | |
| from resume import build_file_list, compute_resume_index | |
| from skip_persistence import save_skip | |
| # --- Logging Setup --- | |
| LOG_DIR = os.environ.get("LOG_DIR", "/app/logs") | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| handlers=[ | |
| logging.FileHandler(os.path.join(LOG_DIR, "app.log")), | |
| logging.StreamHandler(), | |
| ], | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # --- Page Config --- | |
| st.set_page_config(page_title="Audio Labeling Tool", layout="wide") | |
| def init_session_state(): | |
| """Initialize session state defaults.""" | |
| if "authenticated" not in st.session_state: | |
| st.session_state["authenticated"] = False | |
| if "username" not in st.session_state: | |
| st.session_state["username"] = None | |
| if "role" not in st.session_state: | |
| st.session_state["role"] = None | |
| def render_login(): | |
| """Render the login view.""" | |
| st.title("Audio Labeling Tool") | |
| st.subheader("Login") | |
| username = st.text_input("Username", key="login_username") | |
| password = st.text_input("Password", type="password", key="login_password") | |
| if st.button("Login"): | |
| if not username or not password: | |
| st.error("Please enter both username and password.") | |
| return | |
| try: | |
| role = authenticate(username, password) | |
| if role: | |
| login(username, role) | |
| st.rerun() | |
| else: | |
| st.error("Invalid username or password.") | |
| except Exception: | |
| st.error("Invalid username or password.") | |
| def get_labeler_config(username: str) -> dict: | |
| """Get the configuration for the current labeler.""" | |
| config = load_config() | |
| return config["labelers"][username] | |
| def initialize_labeling_session(username: str): | |
| """Initialize the labeling session: build file list, load reference, compute resume.""" | |
| if "file_list" in st.session_state: | |
| return # Already initialized | |
| labeler_cfg = get_labeler_config(username) | |
| audio_folder = labeler_cfg["audio_folder"] | |
| reference_json = labeler_cfg["reference_json"] | |
| output_dir = labeler_cfg["output_dir"] | |
| csv_path = os.path.join(output_dir, f"{username}_metadata.csv") | |
| # Build file list | |
| file_list = build_file_list(audio_folder) | |
| st.session_state["file_list"] = file_list | |
| st.session_state["audio_folder"] = audio_folder | |
| st.session_state["csv_path"] = csv_path | |
| st.session_state["output_dir"] = output_dir | |
| st.session_state["clean_audios_dir"] = labeler_cfg["clean_audios_dir"] | |
| # Load reference JSON | |
| try: | |
| reference = load_reference(reference_json) | |
| st.session_state["reference"] = reference | |
| st.session_state["reference_error"] = None | |
| except (FileNotFoundError, ValueError) as e: | |
| st.session_state["reference"] = {} | |
| st.session_state["reference_error"] = str(e) | |
| # Compute resume index | |
| resume_index = compute_resume_index(file_list, csv_path, username) | |
| st.session_state["current_index"] = resume_index | |
| def render_audio_player(audio_bytes: bytes): | |
| """Render HTML5 audio player with speed control.""" | |
| audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") | |
| audio_html = f""" | |
| <audio id="audio-player" controls style="width: 100%;"> | |
| <source src="data:audio/wav;base64,{audio_b64}" type="audio/wav"> | |
| Your browser does not support the audio element. | |
| </audio> | |
| <script> | |
| var audio = document.getElementById('audio-player'); | |
| var rate = document.getElementById('playback-rate'); | |
| if (rate) {{ | |
| audio.playbackRate = parseFloat(rate.value); | |
| }} | |
| </script> | |
| """ | |
| st.markdown(audio_html, unsafe_allow_html=True) | |
| def render_labeling_ui(): | |
| """Render the main labeling interface.""" | |
| username = st.session_state["username"] | |
| # Header with logout | |
| col_title, col_logout = st.columns([4, 1]) | |
| with col_title: | |
| st.title("Audio Labeling Tool") | |
| with col_logout: | |
| if st.button("Logout"): | |
| logout() | |
| st.rerun() | |
| # Initialize session | |
| initialize_labeling_session(username) | |
| # Check for reference loading error | |
| if st.session_state.get("reference_error"): | |
| st.error( | |
| "Reference file is corrupted or missing. Please contact admin." | |
| ) | |
| st.stop() | |
| file_list = st.session_state["file_list"] | |
| current_index = st.session_state["current_index"] | |
| audio_folder = st.session_state["audio_folder"] | |
| reference = st.session_state["reference"] | |
| # Handle completion | |
| if not file_list: | |
| st.warning("No audio files found in your assigned folder.") | |
| st.stop() | |
| if current_index >= len(file_list): | |
| st.success("All items have been labeled! You're done.") | |
| if st.button("← Go to last item"): | |
| st.session_state["current_index"] = len(file_list) - 1 | |
| st.rerun() | |
| st.stop() | |
| # Current file info | |
| current_filename = file_list[current_index] | |
| # Position indicator | |
| st.markdown(f"**{current_index + 1} / {len(file_list)}** — `{current_filename}`") | |
| # Audio player | |
| try: | |
| audio_bytes = load_audio_bytes(audio_folder, current_filename) | |
| render_audio_player(audio_bytes) | |
| except FileNotFoundError: | |
| st.error(f"Audio file not found: {current_filename}. Please contact admin.") | |
| # Speed control | |
| speed = st.select_slider( | |
| "Playback Speed", | |
| options=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0], | |
| value=1.0, | |
| key="speed_slider", | |
| ) | |
| # Update playback rate via JS | |
| st.markdown( | |
| f"""<script> | |
| var audio = document.getElementById('audio-player'); | |
| if (audio) {{ audio.playbackRate = {speed}; }} | |
| </script>""", | |
| unsafe_allow_html=True, | |
| ) | |
| st.divider() | |
| # Transcription | |
| default_transcription = reference.get(current_filename, "") | |
| if not default_transcription and current_filename not in reference: | |
| st.warning(f"No transcription found for: {current_filename}") | |
| transcription = st.text_area( | |
| "Transcription", | |
| value=default_transcription, | |
| height=100, | |
| key=f"transcription_{current_index}", | |
| ) | |
| # Metadata | |
| col_gender, col_pii = st.columns(2) | |
| with col_gender: | |
| gender = st.radio( | |
| "Gender", | |
| options=["male", "female"], | |
| key=f"gender_{current_index}", | |
| ) | |
| with col_pii: | |
| pii = st.checkbox("Contains PII", key=f"pii_{current_index}") | |
| st.divider() | |
| # Navigation and action buttons | |
| col_prev, col_next, col_apply, col_skip = st.columns(4) | |
| with col_prev: | |
| prev_disabled = current_index <= 0 | |
| if st.button("← Previous", disabled=prev_disabled): | |
| st.session_state["current_index"] = current_index - 1 | |
| st.rerun() | |
| with col_next: | |
| next_disabled = current_index >= len(file_list) - 1 | |
| if st.button("Next →", disabled=next_disabled): | |
| st.session_state["current_index"] = current_index + 1 | |
| st.rerun() | |
| with col_apply: | |
| if st.button("✓ Apply", type="primary"): | |
| # Build record | |
| record = LabelRecord( | |
| source=current_filename, | |
| transcription=transcription, | |
| gender=gender, | |
| pii=pii, | |
| labeler=username, | |
| ) | |
| csv_path = st.session_state["csv_path"] | |
| clean_audios_dir = st.session_state["clean_audios_dir"] | |
| try: | |
| # Save to CSV | |
| save_label(record, csv_path) | |
| # Copy audio to clean folder | |
| copy_to_clean(audio_folder, current_filename, clean_audios_dir) | |
| # Advance pointer only on success | |
| st.session_state["current_index"] = current_index + 1 | |
| st.rerun() | |
| except IOError as e: | |
| st.error(str(e)) | |
| # Pointer NOT advanced | |
| with col_skip: | |
| skip_disabled = current_index >= len(file_list) - 1 | |
| if st.button("Skip ✗", disabled=skip_disabled): | |
| st.session_state["show_skip_reason"] = True | |
| st.rerun() | |
| # Skip reason dialog | |
| if st.session_state.get("show_skip_reason", False): | |
| st.divider() | |
| st.markdown("**Why are you skipping this audio?**") | |
| config = load_config() | |
| skip_reasons = config.get("skip_reasons", ["Other"]) | |
| reason_choice = st.selectbox( | |
| "Select reason", | |
| options=skip_reasons, | |
| key=f"skip_reason_select_{current_index}", | |
| ) | |
| custom_reason = "" | |
| if reason_choice == "Other": | |
| custom_reason = st.text_input( | |
| "Please specify:", key=f"skip_custom_reason_{current_index}" | |
| ) | |
| col_confirm, col_cancel = st.columns(2) | |
| with col_confirm: | |
| if st.button("Confirm Skip"): | |
| final_reason = custom_reason if reason_choice == "Other" else reason_choice | |
| if reason_choice == "Other" and not custom_reason.strip(): | |
| st.error("Please provide a reason.") | |
| else: | |
| shared_output_dir = config["shared_output_dir"] | |
| skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv") | |
| try: | |
| save_skip(username, current_filename, final_reason, skip_csv_path) | |
| st.session_state["show_skip_reason"] = False | |
| st.session_state["current_index"] = current_index + 1 | |
| st.rerun() | |
| except IOError as e: | |
| st.error(str(e)) | |
| with col_cancel: | |
| if st.button("Cancel"): | |
| st.session_state["show_skip_reason"] = False | |
| st.rerun() | |
| def main(): | |
| """Main application entry point.""" | |
| init_session_state() | |
| if st.session_state["authenticated"]: | |
| role = st.session_state.get("role") | |
| if role == "admin": | |
| render_admin_panel() | |
| else: | |
| render_labeling_ui() | |
| else: | |
| render_login() | |
| if __name__ == "__main__": | |
| main() | |