| | import streamlit as st
|
| | import numpy as np
|
| | from datasets import load_dataset, Audio, Dataset
|
| | import os
|
| | from huggingface_hub import HfApi, login, create_repo, upload_file
|
| | import tempfile
|
| | import time
|
| | import json
|
| | from datetime import datetime
|
| |
|
| |
|
| | st.set_page_config(
|
| | page_title="Audio Annotation Tool",
|
| | layout="wide",
|
| | )
|
| |
|
| |
|
| | DEFAULT_DATASET = "kcrl/Hate-Speech"
|
| |
|
| |
|
| | try:
|
| | import os
|
| | DEFAULT_HF_TOKEN = os.environ.get("HF_TOKEN", "")
|
| | except:
|
| | DEFAULT_HF_TOKEN = ""
|
| |
|
| |
|
| | AUTHORIZED_ANNOTATORS = [
|
| | "Ayon",
|
| |
|
| | ]
|
| |
|
| |
|
| | def get_session_file_path(annotator_name):
|
| |
|
| | safe_name = annotator_name.replace(" ", "_").lower() if annotator_name else "default"
|
| |
|
| | os.makedirs("annotation_data", exist_ok=True)
|
| | return os.path.join("annotation_data", f"annotation_state_{safe_name}.json")
|
| |
|
| |
|
| | def save_session_state(annotator_name):
|
| | if not annotator_name:
|
| | return
|
| |
|
| | try:
|
| | state_file = get_session_file_path(annotator_name)
|
| | state_to_save = {
|
| | "current_index": st.session_state.current_index,
|
| | "annotations": st.session_state.annotations,
|
| | "dataset_name": st.session_state.dataset_name,
|
| | "dataset_split": st.session_state.dataset_split,
|
| | "annotator_name": st.session_state.annotator_name,
|
| | "last_updated": datetime.now().isoformat()
|
| | }
|
| |
|
| | with open(state_file, 'w') as f:
|
| |
|
| | json.dump(state_to_save, f, indent=2)
|
| |
|
| | except Exception as e:
|
| | st.warning(f"Could not save session state: {str(e)}")
|
| |
|
| |
|
| | def load_session_state(annotator_name):
|
| | if not annotator_name:
|
| | return False
|
| |
|
| | try:
|
| |
|
| | state_file = get_session_file_path(annotator_name)
|
| | if os.path.exists(state_file):
|
| | with open(state_file, 'r') as f:
|
| | saved_state = json.load(f)
|
| |
|
| | st.session_state.current_index = saved_state.get("current_index", 0)
|
| | st.session_state.annotations = saved_state.get("annotations", {})
|
| | st.session_state.dataset_name = saved_state.get("dataset_name", DEFAULT_DATASET)
|
| | st.session_state.dataset_split = saved_state.get("dataset_split", "train")
|
| |
|
| | return True
|
| | except Exception as e:
|
| | st.warning(f"Could not load session state: {str(e)}")
|
| | return False
|
| |
|
| | return False
|
| |
|
| |
|
| | if "current_index" not in st.session_state:
|
| | st.session_state.current_index = 0
|
| | if "annotations" not in st.session_state:
|
| | st.session_state.annotations = {}
|
| | if "dataset_initialized" not in st.session_state:
|
| | st.session_state.dataset_initialized = False
|
| | if "temp_dir" not in st.session_state:
|
| | st.session_state.temp_dir = tempfile.mkdtemp()
|
| | if "audio_file" not in st.session_state:
|
| | st.session_state.audio_file = None
|
| | if "dataset_info" not in st.session_state:
|
| | st.session_state.dataset_info = None
|
| | if "current_sample" not in st.session_state:
|
| | st.session_state.current_sample = None
|
| | if "dataset_name" not in st.session_state:
|
| | st.session_state.dataset_name = DEFAULT_DATASET
|
| | if "dataset_split" not in st.session_state:
|
| | st.session_state.dataset_split = "train"
|
| | if "class_labels" not in st.session_state:
|
| | st.session_state.class_labels = ["hate", "non-hate", "discard"]
|
| | if "annotator_name" not in st.session_state:
|
| | st.session_state.annotator_name = ""
|
| | if "hf_token" not in st.session_state:
|
| | st.session_state.hf_token = DEFAULT_HF_TOKEN
|
| | if "total_samples" not in st.session_state:
|
| | st.session_state.total_samples = 0
|
| | if "authorized" not in st.session_state:
|
| | st.session_state.authorized = False
|
| | if "state_loaded" not in st.session_state:
|
| | st.session_state.state_loaded = False
|
| |
|
| |
|
| |
|
| | def load_single_sample(dataset_name, split, index):
|
| | try:
|
| |
|
| | dataset = load_dataset(
|
| | dataset_name,
|
| | split=split
|
| | )
|
| |
|
| |
|
| | if index < len(dataset):
|
| | sample = dataset[index]
|
| |
|
| |
|
| | if "audio" in sample and not isinstance(sample["audio"], dict):
|
| |
|
| | audio_feature = Audio()
|
| | sample["audio"] = audio_feature.decode_example(sample["audio"])
|
| |
|
| | return sample
|
| | else:
|
| | st.error(f"Index {index} is out of range (dataset has {len(dataset)} samples)")
|
| | return None
|
| | except Exception as e:
|
| | st.error(f"Error loading sample {index}: {str(e)}")
|
| | return None
|
| |
|
| |
|
| |
|
| | def get_dataset_length(dataset_name, split):
|
| | try:
|
| |
|
| | dataset_info = load_dataset(
|
| | dataset_name,
|
| | split=split
|
| | )
|
| | return len(dataset_info)
|
| | except Exception as e:
|
| | st.error(f"Error getting dataset length: {str(e)}")
|
| | return 0
|
| |
|
| |
|
| |
|
| | st.title("Audio Annotation Tool")
|
| | st.markdown("""
|
| | This tool allows you to annotate audio files from a Hugging Face dataset.
|
| | Your progress is automatically saved and will be restored when you return.
|
| | """)
|
| |
|
| |
|
| | with st.sidebar:
|
| | st.header("Configuration")
|
| |
|
| |
|
| | st.session_state.dataset_name = st.text_input("Hugging Face Dataset Name", value=DEFAULT_DATASET, disabled=True)
|
| | st.session_state.dataset_split = st.text_input("Dataset Split", value=st.session_state.dataset_split)
|
| |
|
| |
|
| | st.text("Labels: hate, non-hate, discard")
|
| |
|
| |
|
| | annotator_input = st.text_input("Your Name (Annotator)", value=st.session_state.annotator_name)
|
| |
|
| |
|
| | if annotator_input != st.session_state.annotator_name:
|
| | st.session_state.annotator_name = annotator_input
|
| | if annotator_input and not st.session_state.state_loaded:
|
| |
|
| | state_loaded = load_session_state(annotator_input)
|
| | if state_loaded:
|
| | st.success(f"Loaded previous session for {annotator_input}! You can continue from where you left off.")
|
| | st.session_state.state_loaded = True
|
| |
|
| | st.session_state.dataset_initialized = True
|
| |
|
| | try:
|
| |
|
| | login(token=DEFAULT_HF_TOKEN)
|
| |
|
| |
|
| | st.session_state.total_samples = get_dataset_length(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split
|
| | )
|
| |
|
| |
|
| | st.session_state.current_sample = load_single_sample(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split,
|
| | st.session_state.current_index
|
| | )
|
| |
|
| | st.rerun()
|
| | except Exception as e:
|
| | st.error(f"Error loading sample: {str(e)}")
|
| |
|
| |
|
| | if st.session_state.annotator_name and st.session_state.annotator_name not in AUTHORIZED_ANNOTATORS:
|
| | st.sidebar.error(f"Sorry, {st.session_state.annotator_name} is not authorized to annotate this dataset.")
|
| | st.session_state.authorized = False
|
| | elif st.session_state.annotator_name in AUTHORIZED_ANNOTATORS:
|
| | st.sidebar.success("Annotator authorized.")
|
| | st.session_state.authorized = True
|
| |
|
| |
|
| | st.session_state.hf_token = DEFAULT_HF_TOKEN
|
| |
|
| |
|
| | initialize_button = st.button("Initialize Dataset")
|
| | if initialize_button or (st.session_state.state_loaded and not st.session_state.dataset_initialized):
|
| | if not st.session_state.authorized:
|
| | st.error("You are not authorized to annotate this dataset. Please use an authorized annotator name.")
|
| | else:
|
| | try:
|
| | with st.spinner("Initializing dataset connection..."):
|
| |
|
| | login(token=DEFAULT_HF_TOKEN)
|
| |
|
| |
|
| | st.session_state.total_samples = get_dataset_length(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split
|
| | )
|
| |
|
| | if st.session_state.total_samples > 0:
|
| | st.session_state.dataset_initialized = True
|
| |
|
| |
|
| | st.session_state.current_sample = load_single_sample(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split,
|
| | st.session_state.current_index
|
| | )
|
| |
|
| | st.success(f"Dataset initialized! Total samples: {st.session_state.total_samples}")
|
| | st.info(f"Starting from sample {st.session_state.current_index + 1}")
|
| | else:
|
| | st.error("Could not determine the size of the dataset or the dataset is empty.")
|
| |
|
| | except Exception as e:
|
| | st.error(f"Error initializing dataset: {str(e)}")
|
| |
|
| |
|
| | if st.session_state.dataset_initialized and st.session_state.current_sample:
|
| |
|
| | st.subheader("Dataset Information")
|
| | st.write(f"Dataset: {st.session_state.dataset_name}")
|
| | st.write(f"Split: {st.session_state.dataset_split}")
|
| | st.write(f"Total samples: {st.session_state.total_samples}")
|
| | st.write(f"Current sample: {st.session_state.current_index + 1}/{st.session_state.total_samples}")
|
| | st.write(f"Annotations completed: {len(st.session_state.annotations)}")
|
| |
|
| |
|
| | try:
|
| |
|
| | audio_sample = st.session_state.current_sample
|
| |
|
| |
|
| | if "audio" in audio_sample and isinstance(audio_sample["audio"], dict):
|
| | audio_data = audio_sample["audio"]["array"]
|
| | sample_rate = audio_sample["audio"]["sampling_rate"]
|
| |
|
| |
|
| | st.subheader("Audio Metadata")
|
| | metadata_cols = [col for col in audio_sample.keys() if col != "audio"]
|
| | if metadata_cols:
|
| | metadata_display = {}
|
| | for col in metadata_cols:
|
| | metadata_display[col] = audio_sample[col]
|
| | st.json(metadata_display)
|
| |
|
| |
|
| | st.subheader("Audio Player")
|
| | st.audio(audio_data, format="audio/wav", sample_rate=sample_rate)
|
| |
|
| |
|
| | st.subheader("Annotation")
|
| |
|
| |
|
| |
|
| | if "file" in audio_sample:
|
| | current_audio_id = audio_sample["file"]
|
| | elif "filename" in audio_sample:
|
| | current_audio_id = audio_sample["filename"]
|
| | elif "path" in audio_sample:
|
| | current_audio_id = audio_sample["path"]
|
| | elif "audio" in audio_sample and "path" in audio_sample["audio"]:
|
| | current_audio_id = os.path.basename(audio_sample["audio"]["path"])
|
| | else:
|
| |
|
| | audio_fields = [field for field in audio_sample.keys() if field != "audio"]
|
| | filename_found = False
|
| |
|
| |
|
| | for field in audio_fields:
|
| | if isinstance(audio_sample[field], str):
|
| | if any(ext in audio_sample[field].lower() for ext in ['.wav', '.mp3', '.ogg', '.flac']):
|
| | current_audio_id = os.path.basename(audio_sample[field])
|
| | filename_found = True
|
| | break
|
| |
|
| |
|
| | if not filename_found and "id" in audio_sample:
|
| | current_audio_id = f"{audio_sample['id']}.wav"
|
| | elif not filename_found:
|
| | current_audio_id = f"audio_{st.session_state.current_index}.wav"
|
| |
|
| |
|
| | current_audio_id = os.path.basename(current_audio_id)
|
| |
|
| | current_annotation = st.session_state.annotations.get(current_audio_id, None)
|
| |
|
| |
|
| | st.caption(f"Audio ID: {current_audio_id}")
|
| |
|
| |
|
| | selected_class = st.radio(
|
| | "Select Class Label",
|
| | options=["hate", "non-hate", "discard"],
|
| | index=["hate", "non-hate", "discard"].index(current_annotation) if current_annotation in ["hate", "non-hate", "discard"] else 0
|
| | )
|
| |
|
| | additional_notes = st.text_area("Additional Notes")
|
| |
|
| |
|
| | col1, col2, col3 = st.columns(3)
|
| |
|
| | with col1:
|
| | if st.button("Previous", disabled=st.session_state.current_index <= 0):
|
| |
|
| | st.session_state.annotations[current_audio_id] = selected_class
|
| | save_session_state(st.session_state.annotator_name)
|
| |
|
| |
|
| | st.session_state.current_index = max(0, st.session_state.current_index - 1)
|
| |
|
| |
|
| | with st.spinner("Loading previous sample..."):
|
| | st.session_state.current_sample = load_single_sample(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split,
|
| | st.session_state.current_index
|
| | )
|
| |
|
| | st.rerun()
|
| |
|
| | with col2:
|
| | if st.button("Save Annotation"):
|
| | if not selected_class:
|
| | st.warning("Please provide a class label.")
|
| | elif not st.session_state.annotator_name:
|
| | st.warning("Please provide your name as the annotator.")
|
| | else:
|
| |
|
| | st.session_state.annotations[current_audio_id] = selected_class
|
| |
|
| | try:
|
| |
|
| | annotations_with_details = {}
|
| | for audio_id, label in st.session_state.annotations.items():
|
| | annotations_with_details[audio_id] = {
|
| | "label": label,
|
| | "annotator": st.session_state.annotator_name,
|
| | "timestamp": datetime.now().isoformat()
|
| | }
|
| |
|
| |
|
| | save_session_state(st.session_state.annotator_name)
|
| |
|
| |
|
| | st.success(f"Sample {st.session_state.current_index + 1} annotated as '{selected_class}'")
|
| |
|
| |
|
| | annotations_file = os.path.join(st.session_state.temp_dir, "annotations.json")
|
| | with open(annotations_file, "w") as f:
|
| | json.dump({
|
| | "dataset": st.session_state.dataset_name,
|
| | "split": st.session_state.dataset_split,
|
| | "annotator": st.session_state.annotator_name,
|
| | "annotations": annotations_with_details,
|
| | "annotation_date": datetime.now().isoformat()
|
| | }, f)
|
| |
|
| | st.success(f"Annotation saved locally. You can download the annotations file at the end of your session.")
|
| |
|
| |
|
| | if st.checkbox("Save to Hugging Face directly"):
|
| | try:
|
| |
|
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False, mode='w') as f:
|
| | json.dump({
|
| | "dataset": st.session_state.dataset_name,
|
| | "split": st.session_state.dataset_split,
|
| | "annotator": st.session_state.annotator_name,
|
| | "annotations": annotations_with_details,
|
| | "annotation_date": datetime.now().isoformat()
|
| | }, f)
|
| | temp_filepath = f.name
|
| |
|
| |
|
| | repo_id = f"{DEFAULT_DATASET}-annotations"
|
| | try:
|
| | create_repo(repo_id, private=True, token=DEFAULT_HF_TOKEN, exist_ok=True)
|
| | except Exception as e:
|
| | st.warning(f"Repository already exists, proceeding with upload: {str(e)}")
|
| |
|
| |
|
| | annotations_filename = f"annotations_{st.session_state.annotator_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
| | upload_file(
|
| | path_or_fileobj=temp_filepath,
|
| | path_in_repo=annotations_filename,
|
| | repo_id=repo_id,
|
| | token=DEFAULT_HF_TOKEN,
|
| | commit_message=f"Annotations by {st.session_state.annotator_name}"
|
| | )
|
| |
|
| | st.success(f"Annotations saved to Hugging Face Hub in repository: {repo_id}")
|
| |
|
| |
|
| | os.unlink(temp_filepath)
|
| |
|
| | except Exception as e:
|
| | st.error(f"Error saving to Hugging Face Hub: {str(e)}")
|
| | st.warning("Annotations saved locally only. Please download them for backup.")
|
| |
|
| | except Exception as e:
|
| | st.error(f"Error saving annotation: {str(e)}")
|
| |
|
| | with col3:
|
| | if st.button("Next", disabled=st.session_state.current_index >= st.session_state.total_samples - 1):
|
| |
|
| | st.session_state.annotations[current_audio_id] = selected_class
|
| | save_session_state(st.session_state.annotator_name)
|
| |
|
| |
|
| | st.session_state.current_index = min(st.session_state.total_samples - 1, st.session_state.current_index + 1)
|
| |
|
| |
|
| | with st.spinner("Loading next sample..."):
|
| | st.session_state.current_sample = load_single_sample(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split,
|
| | st.session_state.current_index
|
| | )
|
| |
|
| | st.rerun()
|
| |
|
| |
|
| | st.progress(st.session_state.current_index / max(1, st.session_state.total_samples - 1))
|
| |
|
| |
|
| | if st.session_state.annotations:
|
| | annotations_with_details = {}
|
| | for audio_id, label in st.session_state.annotations.items():
|
| | annotations_with_details[audio_id] = {
|
| | "label": label,
|
| | "annotator": st.session_state.annotator_name,
|
| | "timestamp": datetime.now().isoformat()
|
| | }
|
| |
|
| | annotations_data = {
|
| | "dataset": st.session_state.dataset_name,
|
| | "split": st.session_state.dataset_split,
|
| | "annotator": st.session_state.annotator_name,
|
| | "annotations": annotations_with_details,
|
| | "annotation_date": datetime.now().isoformat()
|
| | }
|
| | st.download_button(
|
| | "Download Annotations as JSON",
|
| | data=json.dumps(annotations_data, indent=2),
|
| | file_name=f"audio_annotations_{st.session_state.annotator_name.replace(' ', '_').lower()}.json",
|
| | mime="application/json"
|
| | )
|
| |
|
| |
|
| | st.subheader("Jump to Sample")
|
| | col1, col2 = st.columns([3, 1])
|
| | with col1:
|
| | jump_index = st.number_input("Sample Index", min_value=0, max_value=st.session_state.total_samples-1, value=st.session_state.current_index)
|
| | with col2:
|
| | if st.button("Jump"):
|
| |
|
| | st.session_state.annotations[current_audio_id] = selected_class
|
| | save_session_state(st.session_state.annotator_name)
|
| |
|
| |
|
| | st.session_state.current_index = jump_index
|
| | st.session_state.current_sample = load_single_sample(
|
| | st.session_state.dataset_name,
|
| | st.session_state.dataset_split,
|
| | st.session_state.current_index
|
| | )
|
| | st.rerun()
|
| | else:
|
| | st.error("No audio data found in the current sample. Make sure the dataset has an 'audio' column.")
|
| |
|
| | except Exception as e:
|
| | st.error(f"Error displaying audio: {str(e)}")
|
| | st.write("Error details:", str(e))
|
| |
|
| | else:
|
| | st.info("Please configure and initialize a dataset using the sidebar options.")
|
| |
|
| |
|
| | st.markdown("---")
|
| | st.markdown("""
|
| | ### Instructions:
|
| | 1. Enter your name as the annotator (must be on the authorized list)
|
| | 2. Click "Initialize Dataset" to begin annotation
|
| | 3. Listen to each audio sample and annotate as:
|
| | - hate: Contains hate speech
|
| | - non-hate: Does not contain hate speech
|
| | - discard: Cannot be categorized or poor audio quality
|
| | 4. Save your annotations regularly
|
| | 5. Your progress is automatically saved and will be restored when you return
|
| |
|
| | ### Implementation Notes:
|
| | - This tool remembers your position and annotations between sessions
|
| | - Annotations are saved with original audio filenames as keys
|
| | - Only authorized annotators can submit annotations
|
| | """) |