MeysamSh's picture
cleaning and add recording verification and cuting samples in 2 seconds
78e0f7a
import os
import gradio as gr
import hashlib
import pandas as pd
import zipfile
import tempfile
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download,CommitOperationDelete
from pathlib import Path
import librosa
import soundfile as sf
import tempfile
import numpy as np
label_codes = {
"1":"Engine",
"2":"Environmental",
"3":"Mechanical"
}
label_decoder = {v: k for k, v in label_codes.items()}
# --- CONFIGURATION ---
DATASET_REPO_ID = "MeysamSh/ENSIMSoundDataCollection"
HF_TOKEN = os.environ.get("HF_TOKEN")
COUPON_SALT = os.environ.get("COUPON_SALT")
# Admin Credentials
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "30c8663d3ca10ededd17ac1b55f3d533ab29cf1b8470b1729af09afda3f0a516"
AUTHORIZED_USERS = [
"5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8",
"test"
]
api = HfApi()
# --- LOGIC FUNCTIONS ---
def generate_coupon(filename):
"""Creates a unique string for the student to save."""
return hashlib.sha1(f"{filename}{COUPON_SALT}".encode()).hexdigest()[:10].upper()
def verify_user(email):
if not email: return gr.update(visible=False), "⚠️ Enter email."
clean_email = email.strip().lower()
email_hash = hashlib.sha256(clean_email.encode()).hexdigest()
if clean_email in AUTHORIZED_USERS or email_hash in AUTHORIZED_USERS:
return gr.update(visible=True), f"βœ… Access Granted: {clean_email}"
return gr.update(visible=False), "🚫 Not authorized."
def upload_data(email, label, audio_path):
# --- Energy Threshold Setting ---
ENERGY_THRESHOLD = 0.02 # Adjust this: 0.01 is very sensitive, 0.05 is strict
if audio_path is None:
return "⚠️ Please record or upload a sound file.", None, gr.update(), ""
if not label:
return "⚠️ Please select a category label.", gr.update(), gr.update(), ""
try:
y, sr = librosa.load(audio_path, sr=None)
duration = librosa.get_duration(y=y, sr=sr)
if duration < 2.0:
return f"⚠️ Sound too short ({duration:.1f}s).", gr.update(), gr.update(), ""
raw_segments = []
# --- SPLITTING LOGIC ---
if duration < 5.0:
raw_segments.append(y[:int(2 * sr)])
elif duration >= 7.0:
start_sample = int(3 * sr)
remaining_audio = y[start_sample:]
window_size = int(2 * sr)
for i in range(0, len(remaining_audio) - window_size + 1, window_size):
raw_segments.append(remaining_audio[i : i + window_size])
else:
raw_segments.append(y[:int(2 * sr)])
# --- ENERGY CALCULATION & FILTERING ---
valid_segments = []
rejected_count = 0
for seg in raw_segments:
# Calculate RMS energy: sqrt(mean(x^2))
rms = np.sqrt(np.mean(seg**2))
if rms >= ENERGY_THRESHOLD:
valid_segments.append(seg)
else:
rejected_count += 1
if not valid_segments:
return f"❌ Rejected: {rejected_count} segments were too quiet. Please record closer to the source.", None, gr.update(), ""
# --- UPLOAD PROCESS ---
clean_email = email.strip().lower()
email_index = AUTHORIZED_USERS.index(clean_email) if clean_email in AUTHORIZED_USERS else "unknown"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
coupons = []
for idx, seg in enumerate(valid_segments):
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_seg:
sf.write(tmp_seg.name, seg, sr)
seg_filename = f"{email_index}_{timestamp}_seg{idx}.wav"
coupon = generate_coupon(seg_filename)
coupons.append(coupon)
api.upload_file(
path_or_fileobj=tmp_seg.name,
path_in_repo=f"data/{seg_filename}",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
meta_content = f"user_id,label,file_name,time,order\n{clean_email},{label},{seg_filename},{timestamp},{idx+1}"
api.upload_file(
path_or_fileobj=meta_content.encode(),
path_in_repo=f"metadata/meta_{email_index}_{timestamp}_seg{idx}.csv",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
os.unlink(tmp_seg.name)
status_msg = f"πŸŽ‰ Success! {len(valid_segments)} samples accepted."
if rejected_count > 0:
status_msg += f" ({rejected_count} quiet segments discarded)."
return status_msg, None, gr.update(value=None), ", ".join(coupons)
except Exception as e:
return f"❌ Error: {str(e)}", gr.update(), gr.update(), ""
# --- ADMIN LOGIC ---
def delete_all_files(confirm):
if not confirm:
return "⚠️ You must check the 'Confirm' box to delete everything.", gr.update()
try:
# 1. Get all files in the repo
all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
# 2. Filter for files in our managed folders
files_to_delete = [f for f in all_files if f.startswith("data/") or f.startswith("metadata/")]
if not files_to_delete:
return "ℹ️ The dataset is already empty.", gr.update(choices=[])
# 3. Use bulk deletion to avoid hundreds of individual API calls
# This is much faster for "Delete All"
operations = [CommitOperationDelete(path_in_repo=f) for f in files_to_delete]
api.create_commit(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
operations=operations,
commit_message=f"Admin: Bulk delete of {len(files_to_delete)} files",
token=HF_TOKEN
)
return f"πŸ’₯ Success! Deleted {len(files_to_delete)} files. Dataset is now clean.", gr.update(choices=[], value=None)
except Exception as e:
return f"❌ Bulk delete failed: {str(e)}", gr.update()
def get_stats():
"""Helper to calculate stats and label distribution from repository"""
try:
# List all files once to avoid multiple API calls
all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
audio_files = [f for f in all_files if f.startswith("data/") and f.endswith(".wav")]
metadata_files = [f for f in all_files if f.startswith("metadata/") and f.endswith(".csv")]
print(f"Found {len(audio_files)} audio files and {len(metadata_files)} metadata files in the repository.")
# 1. Count Unique Contributors
user_indices = set()
for f in audio_files:
filename = f.split("/")[-1]
user_id = filename.split("_")[0]
user_indices.add(user_id)
# 2. Count Files per Category (Label)
category_counts = {label_codes["1"]: 0, label_codes["2"]: 0, label_codes["3"]: 0}
for m_file in metadata_files:
try:
# Download and read the small metadata file
file_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=m_file, repo_type="dataset", token=HF_TOKEN)
with open(file_path, 'r') as f:
content = f.readlines()
if len(content) > 1:
# The label is the second column in: user_id,label,file_name,timestamp
label = content[1].split(",")[1].strip()
if label in category_counts:
category_counts[label] += 1
else:
# Handle cases where label might not match exactly
category_counts[label] = category_counts.get(label, 0) + 1
except Exception:
print(f"⚠️ Failed to process metadata file: {m_file}")
continue # Skip files that fail to download or parse
# 3. Format the stats string
stats_md = f"### πŸ“Š Dataset Statistics\n"
stats_md += f"**Total Recordings:** {len(audio_files)} \n"
stats_md += f"**Unique Contributors:** {len(user_indices)} \n\n"
stats_md += "**Category Breakdown:**\n"
for cat, count in category_counts.items():
stats_md += f"- **{cat}:** {count} files\n"
return audio_files, stats_md
except Exception as e:
return [], f"⚠️ Error retrieving stats: {str(e)}"
def admin_login(user, pwd):
pwd_hash = hashlib.sha256(pwd.encode()).hexdigest()
if user == ADMIN_USERNAME and pwd_hash == ADMIN_PASSWORD:
audio_files, stats_text = get_stats()
return gr.update(visible=True), gr.update(choices=audio_files), "πŸ”“ Admin Authenticated", stats_text
return gr.update(visible=False), gr.update(choices=[]), "❌ Invalid Credentials", ""
def delete_selected_file(file_path):
if not file_path: return "⚠️ Select a file.", gr.update()
try:
api.delete_file(path_in_repo=file_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN)
meta_path = file_path.replace("data/", "metadata/meta_").replace(".wav", ".csv")
try:
api.delete_file(path_in_repo=meta_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN)
except: pass
audio_files, stats_text = get_stats()
return f"πŸ—‘οΈ Deleted {file_path}. {stats_text}", gr.update(choices=audio_files, value=None)
except Exception as e: return f"❌ Error: {str(e)}", gr.update()
def access_dataset_zip(email, coupons_str):
if not email or not coupons_str:
return None, "⚠️ Please provide your email and coupons."
coupons_list = [c.strip().upper() for c in coupons_str.split(",") if c.strip()]
num_coupons = len(coupons_list)
if num_coupons == 0:
return None, "⚠️ No valid coupons provided."
try:
all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
meta_files = [f for f in all_files if f.startswith("metadata/")]
tmp_dir = tempfile.mkdtemp()
zip_path = os.path.join(tmp_dir, f"ENSIM_Data_Collection.zip")
# This list will hold rows for our single combined CSV
compiled_metadata = []
with zipfile.ZipFile(zip_path, 'w') as zipf:
for m_file in meta_files:
local_meta = hf_hub_download(repo_id=DATASET_REPO_ID, filename=m_file, repo_type="dataset", token=HF_TOKEN)
df = pd.read_csv(local_meta)
row = df.iloc[0]
order = int(row['order'])
audio_filename = row['file_name']
audio_repo_path = f"data/{audio_filename}"
is_training = order % 2 != 0
# --- ACCESS LOGIC ---
# 1. Training files (Odd): include only if within coupon count
if is_training and order <= num_coupons:
audio_local = hf_hub_download(repo_id=DATASET_REPO_ID, filename=audio_repo_path, repo_type="dataset", token=HF_TOKEN)
zipf.write(audio_local, arcname=f"training_set/{audio_filename}")
# Add to the compiled metadata list
compiled_metadata.append({
"wav_filename": audio_filename,
"label": row['label']
})
# 2. Test files (Even): Always included (Labels omitted from compiled CSV)
elif not is_training:
audio_local = hf_hub_download(repo_id=DATASET_REPO_ID, filename=audio_repo_path, repo_type="dataset", token=HF_TOKEN)
zipf.write(audio_local, arcname=f"test_set/{audio_filename}")
# Add to compiled metadata but set label to HIDDEN or empty
compiled_metadata.append({
"wav_filename": audio_filename,
"label": "HIDDEN"
})
# --- CREATE THE SINGLE CONSOLIDATED CSV ---
if compiled_metadata:
master_df = pd.DataFrame(compiled_metadata)
master_csv_path = os.path.join(tmp_dir, "metadata_summary.csv")
# Save only the columns requested
master_df.to_csv(master_csv_path, index=False, columns=["wav_filename", "label"])
# Place it at the root of the ZIP for easy access
zipf.write(master_csv_path, arcname="metadata_summary.csv")
return zip_path, f"βœ… ZIP created with {len(compiled_metadata)} total references."
except Exception as e:
return None, f"❌ Error: {str(e)}"
# except Exception as e:
# return None, f"❌ Error creating ZIP: {str(e)}"
# except Exception as e: return f"❌ Error: {str(e)}"
# --- UI ---
with gr.Blocks() as demo:
gr.Markdown("# πŸŽ™οΈ Sound Data Platform")
with gr.Tabs():
# STUDENT TAB
with gr.TabItem("Dataset Collection"):
with gr.Row():
email_input = gr.Textbox(label="Email", placeholder="test")
login_btn = gr.Button("Verify", variant="primary")
login_status = gr.Markdown("Waiting for login...")
with gr.Column(visible=False) as recording_zone:
label_input = gr.Radio(choices=[label_codes["1"], label_codes["2"], label_codes["3"]], label="Category")
audio_input = gr.Audio(label="Record (40s)", sources=["microphone"], type="filepath")
submit_btn = gr.Button("πŸš€ Submit", variant="primary")
res_msg = gr.Textbox(label="Status", interactive=False)
coupon_display = gr.Textbox(label="🎟️ YOUR COUPON (Save this!)", interactive=False)
# 2. DATASET ACCESS TAB
with gr.TabItem("Dataset Access"):
gr.Markdown("""
### πŸ”“ Unlock Your Data Partition
- **Training Data:** You receive Training samples (Audio + Label) proportional to your coupons.
- **Test Data:** You receive the full global Test set (Audio Only) to evaluate your models.
""")
acc_email = gr.Textbox(label="Email")
coupons_input = gr.Textbox(label="Coupons List (comma separated)", placeholder="C1, C2, C3...")
download_btn = gr.Button("πŸ“¦ Generate Data ZIP", variant="primary")
status_out = gr.Textbox(label="Status")
file_out = gr.File(label="Download Your Data")
# ADMIN TAB
with gr.TabItem("Administration"):
with gr.Row():
admin_user = gr.Textbox(label="Admin Username")
admin_pass = gr.Textbox(label="Admin Password", type="password")
admin_login_btn = gr.Button("Login Admin")
admin_msg = gr.Markdown("Log in to manage files.")
# This will show the statistics
admin_stats_display = gr.Markdown("")
with gr.Column(visible=False) as admin_panel:
file_dropdown = gr.Dropdown(label="Select File to Remove", choices=[])
delete_btn = gr.Button("πŸ—‘οΈ Delete Selected File", variant="stop")
delete_status = gr.Textbox(label="Delete Progress")
gr.Markdown("### 🧨 Danger Zone")
confirm_check = gr.Checkbox(label="I understand this will permanently delete ALL recordings and metadata.")
delete_all_btn = gr.Button("πŸ”₯ DELETE ALL DATASET FILES", variant="stop")
delete_status = gr.Textbox(label="Status Log")
# --- EVENT HANDLERS ---
login_btn.click(verify_user, [email_input], [recording_zone, login_status])
submit_btn.click(
fn=upload_data,
inputs=[email_input, label_input, audio_input],
outputs=[res_msg, audio_input, label_input, coupon_display]
)
admin_login_btn.click(
admin_login,
[admin_user, admin_pass],
[admin_panel, file_dropdown, admin_msg, admin_stats_display]
)
delete_btn.click(
delete_selected_file,
[file_dropdown],
[delete_status, file_dropdown]
)
download_btn.click(
fn=access_dataset_zip,
inputs=[acc_email, coupons_input],
outputs=[file_out, status_out]
)
delete_all_btn.click(
fn=delete_all_files,
inputs=[confirm_check],
outputs=[delete_status, file_dropdown]
)
if __name__ == "__main__":
demo.launch(theme=gr.themes.Soft())