test_ui2 / app.py
valiyevfagan's picture
Update app.py
632b5d2 verified
"""Audio Labeling Tool — Streamlit Application Entry Point."""
import base64
import logging
import os
import streamlit as st
from admin_panel import render_admin_panel
from auth import authenticate, login, logout
from audio_loader import copy_to_clean, load_audio_bytes
from config import load_config
from csv_persistence import save_label
from models import LabelRecord
from reference import load_reference
from resume import build_file_list, compute_resume_index
from skip_persistence import save_skip
# --- Logging Setup ---
LOG_DIR = os.environ.get("LOG_DIR", "/app/logs")
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, "app.log")),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
# --- Page Config ---
st.set_page_config(page_title="Audio Labeling Tool", layout="wide")
def init_session_state():
"""Initialize session state defaults."""
if "authenticated" not in st.session_state:
st.session_state["authenticated"] = False
if "username" not in st.session_state:
st.session_state["username"] = None
if "role" not in st.session_state:
st.session_state["role"] = None
def render_login():
"""Render the login view."""
st.title("Audio Labeling Tool")
st.subheader("Login")
username = st.text_input("Username", key="login_username")
password = st.text_input("Password", type="password", key="login_password")
if st.button("Login"):
if not username or not password:
st.error("Please enter both username and password.")
return
try:
role = authenticate(username, password)
if role:
login(username, role)
st.rerun()
else:
st.error("Invalid username or password.")
except Exception:
st.error("Invalid username or password.")
def get_labeler_config(username: str) -> dict:
"""Get the configuration for the current labeler."""
config = load_config()
return config["labelers"][username]
def initialize_labeling_session(username: str):
"""Initialize the labeling session: build file list, load reference, compute resume."""
if "file_list" in st.session_state:
return # Already initialized
labeler_cfg = get_labeler_config(username)
audio_folder = labeler_cfg["audio_folder"]
reference_json = labeler_cfg["reference_json"]
output_dir = labeler_cfg["output_dir"]
csv_path = os.path.join(output_dir, f"{username}_metadata.csv")
# Build file list
file_list = build_file_list(audio_folder)
st.session_state["file_list"] = file_list
st.session_state["audio_folder"] = audio_folder
st.session_state["csv_path"] = csv_path
st.session_state["output_dir"] = output_dir
st.session_state["clean_audios_dir"] = labeler_cfg["clean_audios_dir"]
# Load reference JSON
try:
reference = load_reference(reference_json)
st.session_state["reference"] = reference
st.session_state["reference_error"] = None
except (FileNotFoundError, ValueError) as e:
st.session_state["reference"] = {}
st.session_state["reference_error"] = str(e)
# Compute resume index
resume_index = compute_resume_index(file_list, csv_path, username)
st.session_state["current_index"] = resume_index
def render_audio_player(audio_bytes: bytes):
"""Render HTML5 audio player with speed control."""
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
audio_html = f"""
<audio id="audio-player" controls style="width: 100%;">
<source src="data:audio/wav;base64,{audio_b64}" type="audio/wav">
Your browser does not support the audio element.
</audio>
<script>
var audio = document.getElementById('audio-player');
var rate = document.getElementById('playback-rate');
if (rate) {{
audio.playbackRate = parseFloat(rate.value);
}}
</script>
"""
st.markdown(audio_html, unsafe_allow_html=True)
def render_labeling_ui():
"""Render the main labeling interface."""
username = st.session_state["username"]
# Header with logout
col_title, col_logout = st.columns([4, 1])
with col_title:
st.title("Audio Labeling Tool")
with col_logout:
if st.button("Logout"):
logout()
st.rerun()
# Initialize session
initialize_labeling_session(username)
# Check for reference loading error
if st.session_state.get("reference_error"):
st.error(
"Reference file is corrupted or missing. Please contact admin."
)
st.stop()
file_list = st.session_state["file_list"]
current_index = st.session_state["current_index"]
audio_folder = st.session_state["audio_folder"]
reference = st.session_state["reference"]
# Handle completion
if not file_list:
st.warning("No audio files found in your assigned folder.")
st.stop()
if current_index >= len(file_list):
st.success("All items have been labeled! You're done.")
if st.button("← Go to last item"):
st.session_state["current_index"] = len(file_list) - 1
st.rerun()
st.stop()
# Current file info
current_filename = file_list[current_index]
# Position indicator
st.markdown(f"**{current_index + 1} / {len(file_list)}** — `{current_filename}`")
# Audio player
try:
audio_bytes = load_audio_bytes(audio_folder, current_filename)
render_audio_player(audio_bytes)
except FileNotFoundError:
st.error(f"Audio file not found: {current_filename}. Please contact admin.")
# Speed control
speed = st.select_slider(
"Playback Speed",
options=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0],
value=1.0,
key="speed_slider",
)
# Update playback rate via JS
st.markdown(
f"""<script>
var audio = document.getElementById('audio-player');
if (audio) {{ audio.playbackRate = {speed}; }}
</script>""",
unsafe_allow_html=True,
)
st.divider()
# Transcription
default_transcription = reference.get(current_filename, "")
if not default_transcription and current_filename not in reference:
st.warning(f"No transcription found for: {current_filename}")
transcription = st.text_area(
"Transcription",
value=default_transcription,
height=100,
key=f"transcription_{current_index}",
)
# Metadata
col_gender, col_pii = st.columns(2)
with col_gender:
gender = st.radio(
"Gender",
options=["male", "female"],
key=f"gender_{current_index}",
)
with col_pii:
pii = st.checkbox("Contains PII", key=f"pii_{current_index}")
st.divider()
# Navigation and action buttons
col_prev, col_next, col_apply, col_skip = st.columns(4)
with col_prev:
prev_disabled = current_index <= 0
if st.button("← Previous", disabled=prev_disabled):
st.session_state["current_index"] = current_index - 1
st.rerun()
with col_next:
next_disabled = current_index >= len(file_list) - 1
if st.button("Next →", disabled=next_disabled):
st.session_state["current_index"] = current_index + 1
st.rerun()
with col_apply:
if st.button("✓ Apply", type="primary"):
# Build record
record = LabelRecord(
source=current_filename,
transcription=transcription,
gender=gender,
pii=pii,
labeler=username,
)
csv_path = st.session_state["csv_path"]
clean_audios_dir = st.session_state["clean_audios_dir"]
try:
# Save to CSV
save_label(record, csv_path)
# Copy audio to clean folder
copy_to_clean(audio_folder, current_filename, clean_audios_dir)
# Advance pointer only on success
st.session_state["current_index"] = current_index + 1
st.rerun()
except IOError as e:
st.error(str(e))
# Pointer NOT advanced
with col_skip:
skip_disabled = current_index >= len(file_list) - 1
if st.button("Skip ✗", disabled=skip_disabled):
st.session_state["show_skip_reason"] = True
st.rerun()
# Skip reason dialog
if st.session_state.get("show_skip_reason", False):
st.divider()
st.markdown("**Why are you skipping this audio?**")
config = load_config()
skip_reasons = config.get("skip_reasons", ["Other"])
reason_choice = st.selectbox(
"Select reason",
options=skip_reasons,
key=f"skip_reason_select_{current_index}",
)
custom_reason = ""
if reason_choice == "Other":
custom_reason = st.text_input(
"Please specify:", key=f"skip_custom_reason_{current_index}"
)
col_confirm, col_cancel = st.columns(2)
with col_confirm:
if st.button("Confirm Skip"):
final_reason = custom_reason if reason_choice == "Other" else reason_choice
if reason_choice == "Other" and not custom_reason.strip():
st.error("Please provide a reason.")
else:
shared_output_dir = config["shared_output_dir"]
skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv")
try:
save_skip(username, current_filename, final_reason, skip_csv_path)
st.session_state["show_skip_reason"] = False
st.session_state["current_index"] = current_index + 1
st.rerun()
except IOError as e:
st.error(str(e))
with col_cancel:
if st.button("Cancel"):
st.session_state["show_skip_reason"] = False
st.rerun()
def main():
"""Main application entry point."""
init_session_state()
if st.session_state["authenticated"]:
role = st.session_state.get("role")
if role == "admin":
render_admin_panel()
else:
render_labeling_ui()
else:
render_login()
if __name__ == "__main__":
main()