Spaces:
Sleeping
Sleeping
File size: 10,710 Bytes
d7efa84 a757e52 d7efa84 8e4014a d7efa84 8e4014a d7efa84 8e4014a d7efa84 8e4014a d7efa84 8e4014a d7efa84 8e4014a d7efa84 632b5d2 d7efa84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 | """Audio Labeling Tool β Streamlit Application Entry Point."""
import base64
import logging
import os
import streamlit as st
from admin_panel import render_admin_panel
from auth import authenticate, login, logout
from audio_loader import copy_to_clean, load_audio_bytes
from config import load_config
from csv_persistence import save_label
from models import LabelRecord
from reference import load_reference
from resume import build_file_list, compute_resume_index
from skip_persistence import save_skip
# --- Logging Setup ---
LOG_DIR = os.environ.get("LOG_DIR", "/app/logs")
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, "app.log")),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
# --- Page Config ---
st.set_page_config(page_title="Audio Labeling Tool", layout="wide")
def init_session_state():
"""Initialize session state defaults."""
if "authenticated" not in st.session_state:
st.session_state["authenticated"] = False
if "username" not in st.session_state:
st.session_state["username"] = None
if "role" not in st.session_state:
st.session_state["role"] = None
def render_login():
"""Render the login view."""
st.title("Audio Labeling Tool")
st.subheader("Login")
username = st.text_input("Username", key="login_username")
password = st.text_input("Password", type="password", key="login_password")
if st.button("Login"):
if not username or not password:
st.error("Please enter both username and password.")
return
try:
role = authenticate(username, password)
if role:
login(username, role)
st.rerun()
else:
st.error("Invalid username or password.")
except Exception:
st.error("Invalid username or password.")
def get_labeler_config(username: str) -> dict:
"""Get the configuration for the current labeler."""
config = load_config()
return config["labelers"][username]
def initialize_labeling_session(username: str):
"""Initialize the labeling session: build file list, load reference, compute resume."""
if "file_list" in st.session_state:
return # Already initialized
labeler_cfg = get_labeler_config(username)
audio_folder = labeler_cfg["audio_folder"]
reference_json = labeler_cfg["reference_json"]
output_dir = labeler_cfg["output_dir"]
csv_path = os.path.join(output_dir, f"{username}_metadata.csv")
# Build file list
file_list = build_file_list(audio_folder)
st.session_state["file_list"] = file_list
st.session_state["audio_folder"] = audio_folder
st.session_state["csv_path"] = csv_path
st.session_state["output_dir"] = output_dir
st.session_state["clean_audios_dir"] = labeler_cfg["clean_audios_dir"]
# Load reference JSON
try:
reference = load_reference(reference_json)
st.session_state["reference"] = reference
st.session_state["reference_error"] = None
except (FileNotFoundError, ValueError) as e:
st.session_state["reference"] = {}
st.session_state["reference_error"] = str(e)
# Compute resume index
resume_index = compute_resume_index(file_list, csv_path, username)
st.session_state["current_index"] = resume_index
def render_audio_player(audio_bytes: bytes):
"""Render HTML5 audio player with speed control."""
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8")
audio_html = f"""
<audio id="audio-player" controls style="width: 100%;">
<source src="data:audio/wav;base64,{audio_b64}" type="audio/wav">
Your browser does not support the audio element.
</audio>
<script>
var audio = document.getElementById('audio-player');
var rate = document.getElementById('playback-rate');
if (rate) {{
audio.playbackRate = parseFloat(rate.value);
}}
</script>
"""
st.markdown(audio_html, unsafe_allow_html=True)
def render_labeling_ui():
"""Render the main labeling interface."""
username = st.session_state["username"]
# Header with logout
col_title, col_logout = st.columns([4, 1])
with col_title:
st.title("Audio Labeling Tool")
with col_logout:
if st.button("Logout"):
logout()
st.rerun()
# Initialize session
initialize_labeling_session(username)
# Check for reference loading error
if st.session_state.get("reference_error"):
st.error(
"Reference file is corrupted or missing. Please contact admin."
)
st.stop()
file_list = st.session_state["file_list"]
current_index = st.session_state["current_index"]
audio_folder = st.session_state["audio_folder"]
reference = st.session_state["reference"]
# Handle completion
if not file_list:
st.warning("No audio files found in your assigned folder.")
st.stop()
if current_index >= len(file_list):
st.success("All items have been labeled! You're done.")
if st.button("β Go to last item"):
st.session_state["current_index"] = len(file_list) - 1
st.rerun()
st.stop()
# Current file info
current_filename = file_list[current_index]
# Position indicator
st.markdown(f"**{current_index + 1} / {len(file_list)}** β `{current_filename}`")
# Audio player
try:
audio_bytes = load_audio_bytes(audio_folder, current_filename)
render_audio_player(audio_bytes)
except FileNotFoundError:
st.error(f"Audio file not found: {current_filename}. Please contact admin.")
# Speed control
speed = st.select_slider(
"Playback Speed",
options=[0.5, 0.75, 1.0, 1.25, 1.5, 2.0],
value=1.0,
key="speed_slider",
)
# Update playback rate via JS
st.markdown(
f"""<script>
var audio = document.getElementById('audio-player');
if (audio) {{ audio.playbackRate = {speed}; }}
</script>""",
unsafe_allow_html=True,
)
st.divider()
# Transcription
default_transcription = reference.get(current_filename, "")
if not default_transcription and current_filename not in reference:
st.warning(f"No transcription found for: {current_filename}")
transcription = st.text_area(
"Transcription",
value=default_transcription,
height=100,
key=f"transcription_{current_index}",
)
# Metadata
col_gender, col_pii = st.columns(2)
with col_gender:
gender = st.radio(
"Gender",
options=["male", "female"],
key=f"gender_{current_index}",
)
with col_pii:
pii = st.checkbox("Contains PII", key=f"pii_{current_index}")
st.divider()
# Navigation and action buttons
col_prev, col_next, col_apply, col_skip = st.columns(4)
with col_prev:
prev_disabled = current_index <= 0
if st.button("β Previous", disabled=prev_disabled):
st.session_state["current_index"] = current_index - 1
st.rerun()
with col_next:
next_disabled = current_index >= len(file_list) - 1
if st.button("Next β", disabled=next_disabled):
st.session_state["current_index"] = current_index + 1
st.rerun()
with col_apply:
if st.button("β Apply", type="primary"):
# Build record
record = LabelRecord(
source=current_filename,
transcription=transcription,
gender=gender,
pii=pii,
labeler=username,
)
csv_path = st.session_state["csv_path"]
clean_audios_dir = st.session_state["clean_audios_dir"]
try:
# Save to CSV
save_label(record, csv_path)
# Copy audio to clean folder
copy_to_clean(audio_folder, current_filename, clean_audios_dir)
# Advance pointer only on success
st.session_state["current_index"] = current_index + 1
st.rerun()
except IOError as e:
st.error(str(e))
# Pointer NOT advanced
with col_skip:
skip_disabled = current_index >= len(file_list) - 1
if st.button("Skip β", disabled=skip_disabled):
st.session_state["show_skip_reason"] = True
st.rerun()
# Skip reason dialog
if st.session_state.get("show_skip_reason", False):
st.divider()
st.markdown("**Why are you skipping this audio?**")
config = load_config()
skip_reasons = config.get("skip_reasons", ["Other"])
reason_choice = st.selectbox(
"Select reason",
options=skip_reasons,
key=f"skip_reason_select_{current_index}",
)
custom_reason = ""
if reason_choice == "Other":
custom_reason = st.text_input(
"Please specify:", key=f"skip_custom_reason_{current_index}"
)
col_confirm, col_cancel = st.columns(2)
with col_confirm:
if st.button("Confirm Skip"):
final_reason = custom_reason if reason_choice == "Other" else reason_choice
if reason_choice == "Other" and not custom_reason.strip():
st.error("Please provide a reason.")
else:
shared_output_dir = config["shared_output_dir"]
skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv")
try:
save_skip(username, current_filename, final_reason, skip_csv_path)
st.session_state["show_skip_reason"] = False
st.session_state["current_index"] = current_index + 1
st.rerun()
except IOError as e:
st.error(str(e))
with col_cancel:
if st.button("Cancel"):
st.session_state["show_skip_reason"] = False
st.rerun()
def main():
"""Main application entry point."""
init_session_state()
if st.session_state["authenticated"]:
role = st.session_state.get("role")
if role == "admin":
render_admin_panel()
else:
render_labeling_ui()
else:
render_login()
if __name__ == "__main__":
main()
|