"""Admin panel for tracking labeler progress and activity.""" import csv import logging import os from pathlib import Path import streamlit as st from config import load_config logger = logging.getLogger(__name__) def render_admin_panel(): """Render the admin dashboard.""" config = load_config() # Header with logout col_title, col_logout = st.columns([4, 1]) with col_title: st.title("Admin Panel") with col_logout: from auth import logout if st.button("Logout"): logout() st.rerun() # Tabs for different views tab_progress, tab_metadata, tab_skipped, tab_activity, tab_downloads = st.tabs( ["Progress", "Metadata CSVs", "Skipped Audios", "Activity Log", "Downloads"] ) with tab_progress: _render_progress(config) with tab_metadata: _render_metadata_viewer(config) with tab_skipped: _render_skipped_viewer(config) with tab_activity: _render_activity_log(config) with tab_downloads: _render_downloads(config) def _read_csv_rows(csv_path: str) -> list[dict]: """Read all rows from a CSV file.""" path = Path(csv_path) if not path.exists(): return [] try: with open(path, "r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) return [row for row in reader] except Exception as e: logger.warning(f"Could not read CSV {csv_path}: {e}") return [] def _count_audio_files(audio_folder: str) -> int: """Count WAV files in a folder.""" folder = Path(audio_folder) if not folder.exists(): return 0 return sum(1 for f in folder.iterdir() if f.suffix.lower() == ".wav" and f.is_file()) def _render_progress(config: dict): """Render progress tracking for each labeler.""" st.subheader("Labeler Progress") shared_output_dir = config["shared_output_dir"] skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv") skip_rows = _read_csv_rows(skip_csv_path) for labeler_name, labeler_cfg in config["labelers"].items(): st.markdown(f"### {labeler_name}") total_files = _count_audio_files(labeler_cfg["audio_folder"]) csv_path = os.path.join(labeler_cfg["output_dir"], f"{labeler_name}_metadata.csv") labeled_rows = _read_csv_rows(csv_path) labeled_count = len(labeled_rows) # Count skips for this labeler skipped_count = sum(1 for r in skip_rows if r.get("labeler") == labeler_name) remaining = max(0, total_files - labeled_count - skipped_count) col1, col2, col3, col4 = st.columns(4) col1.metric("Total", total_files) col2.metric("Labeled", labeled_count) col3.metric("Skipped", skipped_count) col4.metric("Remaining", remaining) if total_files > 0: progress = (labeled_count + skipped_count) / total_files st.progress(min(progress, 1.0)) else: st.progress(0.0) st.divider() def _render_metadata_viewer(config: dict): """Render metadata CSV viewer for each labeler.""" st.subheader("Metadata CSVs") for labeler_name, labeler_cfg in config["labelers"].items(): csv_path = os.path.join(labeler_cfg["output_dir"], f"{labeler_name}_metadata.csv") rows = _read_csv_rows(csv_path) with st.expander(f"{labeler_name} — {len(rows)} entries", expanded=False): if rows: st.dataframe(rows, use_container_width=True) else: st.info("No labels recorded yet.") def _render_skipped_viewer(config: dict): """Render skipped audios CSV viewer.""" st.subheader("Skipped Audios") shared_output_dir = config["shared_output_dir"] skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv") rows = _read_csv_rows(skip_csv_path) if rows: # Filter by labeler labelers = list(config["labelers"].keys()) selected_labeler = st.selectbox( "Filter by labeler", options=["All"] + labelers, key="skip_filter" ) if selected_labeler != "All": rows = [r for r in rows if r.get("labeler") == selected_labeler] st.dataframe(rows, use_container_width=True) st.caption(f"Total skipped: {len(rows)}") else: st.info("No skipped audios recorded yet.") def _render_activity_log(config: dict): """Render recent activity from both metadata and skip CSVs.""" st.subheader("Activity Log") # Collect all activity with timestamps activities = [] # From metadata CSVs (no timestamp column, so we just show them) for labeler_name, labeler_cfg in config["labelers"].items(): csv_path = os.path.join(labeler_cfg["output_dir"], f"{labeler_name}_metadata.csv") rows = _read_csv_rows(csv_path) for row in rows: activities.append({ "labeler": row.get("labeler", labeler_name), "action": "labeled", "source": row.get("source", ""), "details": f"gender={row.get('gender', '')}, pii={row.get('pii', '')}", "timestamp": "", }) # From skip CSV (has timestamps) shared_output_dir = config["shared_output_dir"] skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv") skip_rows = _read_csv_rows(skip_csv_path) for row in skip_rows: activities.append({ "labeler": row.get("labeler", ""), "action": "skipped", "source": row.get("source", ""), "details": row.get("reason", ""), "timestamp": row.get("timestamp", ""), }) # Sort by timestamp (skips have timestamps, labels don't — labels go to end) activities.sort(key=lambda x: x["timestamp"] or "0", reverse=True) if activities: # Show last 50 st.dataframe(activities[:50], use_container_width=True) st.caption(f"Showing latest {min(50, len(activities))} of {len(activities)} total actions.") else: st.info("No activity recorded yet.") def _render_downloads(config: dict): """Render download buttons for CSV files.""" st.subheader("Download Reports") # Metadata CSVs for labeler_name, labeler_cfg in config["labelers"].items(): csv_path = os.path.join(labeler_cfg["output_dir"], f"{labeler_name}_metadata.csv") path = Path(csv_path) if path.exists(): data = path.read_bytes() st.download_button( label=f"📥 {labeler_name}_metadata.csv", data=data, file_name=f"{labeler_name}_metadata.csv", mime="text/csv", key=f"dl_metadata_{labeler_name}", ) else: st.caption(f"{labeler_name}_metadata.csv — not yet created") st.divider() # Skipped CSV shared_output_dir = config["shared_output_dir"] skip_csv_path = os.path.join(shared_output_dir, "skipped_audios.csv") skip_path = Path(skip_csv_path) if skip_path.exists(): data = skip_path.read_bytes() st.download_button( label="📥 skipped_audios.csv", data=data, file_name="skipped_audios.csv", mime="text/csv", key="dl_skipped", ) else: st.caption("skipped_audios.csv — not yet created") st.divider() # App log file log_dir = os.environ.get("LOG_DIR", "/var/log/audio_labeling_tool") log_path = Path(log_dir) / "app.log" if log_path.exists(): data = log_path.read_bytes() st.download_button( label="📥 app.log", data=data, file_name="app.log", mime="text/plain", key="dl_log", ) else: st.caption("app.log — not yet created")