import os import gradio as gr import hashlib import pandas as pd import zipfile import tempfile from datetime import datetime from huggingface_hub import HfApi, hf_hub_download,CommitOperationDelete from pathlib import Path import librosa import soundfile as sf import tempfile import numpy as np label_codes = { "1":"Engine", "2":"Environmental", "3":"Mechanical" } label_decoder = {v: k for k, v in label_codes.items()} # --- CONFIGURATION --- DATASET_REPO_ID = "MeysamSh/ENSIMSoundDataCollection" HF_TOKEN = os.environ.get("HF_TOKEN") COUPON_SALT = os.environ.get("COUPON_SALT") # Admin Credentials ADMIN_USERNAME = "admin" ADMIN_PASSWORD = "30c8663d3ca10ededd17ac1b55f3d533ab29cf1b8470b1729af09afda3f0a516" AUTHORIZED_USERS = [ "5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8", "test" ] api = HfApi() # --- LOGIC FUNCTIONS --- def generate_coupon(filename): """Creates a unique string for the student to save.""" return hashlib.sha1(f"{filename}{COUPON_SALT}".encode()).hexdigest()[:10].upper() def verify_user(email): if not email: return gr.update(visible=False), "โš ๏ธ Enter email." clean_email = email.strip().lower() email_hash = hashlib.sha256(clean_email.encode()).hexdigest() if clean_email in AUTHORIZED_USERS or email_hash in AUTHORIZED_USERS: return gr.update(visible=True), f"โœ… Access Granted: {clean_email}" return gr.update(visible=False), "๐Ÿšซ Not authorized." def upload_data(email, label, audio_path): # --- Energy Threshold Setting --- ENERGY_THRESHOLD = 0.02 # Adjust this: 0.01 is very sensitive, 0.05 is strict if audio_path is None: return "โš ๏ธ Please record or upload a sound file.", None, gr.update(), "" if not label: return "โš ๏ธ Please select a category label.", gr.update(), gr.update(), "" try: y, sr = librosa.load(audio_path, sr=None) duration = librosa.get_duration(y=y, sr=sr) if duration < 2.0: return f"โš ๏ธ Sound too short ({duration:.1f}s).", gr.update(), gr.update(), "" raw_segments = [] # --- SPLITTING LOGIC --- if duration < 5.0: raw_segments.append(y[:int(2 * sr)]) elif duration >= 7.0: start_sample = int(3 * sr) remaining_audio = y[start_sample:] window_size = int(2 * sr) for i in range(0, len(remaining_audio) - window_size + 1, window_size): raw_segments.append(remaining_audio[i : i + window_size]) else: raw_segments.append(y[:int(2 * sr)]) # --- ENERGY CALCULATION & FILTERING --- valid_segments = [] rejected_count = 0 for seg in raw_segments: # Calculate RMS energy: sqrt(mean(x^2)) rms = np.sqrt(np.mean(seg**2)) if rms >= ENERGY_THRESHOLD: valid_segments.append(seg) else: rejected_count += 1 if not valid_segments: return f"โŒ Rejected: {rejected_count} segments were too quiet. Please record closer to the source.", None, gr.update(), "" # --- UPLOAD PROCESS --- clean_email = email.strip().lower() email_index = AUTHORIZED_USERS.index(clean_email) if clean_email in AUTHORIZED_USERS else "unknown" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") coupons = [] for idx, seg in enumerate(valid_segments): with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_seg: sf.write(tmp_seg.name, seg, sr) seg_filename = f"{email_index}_{timestamp}_seg{idx}.wav" coupon = generate_coupon(seg_filename) coupons.append(coupon) api.upload_file( path_or_fileobj=tmp_seg.name, path_in_repo=f"data/{seg_filename}", repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN ) meta_content = f"user_id,label,file_name,time,order\n{clean_email},{label},{seg_filename},{timestamp},{idx+1}" api.upload_file( path_or_fileobj=meta_content.encode(), path_in_repo=f"metadata/meta_{email_index}_{timestamp}_seg{idx}.csv", repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN ) os.unlink(tmp_seg.name) status_msg = f"๐ŸŽ‰ Success! {len(valid_segments)} samples accepted." if rejected_count > 0: status_msg += f" ({rejected_count} quiet segments discarded)." return status_msg, None, gr.update(value=None), ", ".join(coupons) except Exception as e: return f"โŒ Error: {str(e)}", gr.update(), gr.update(), "" # --- ADMIN LOGIC --- def delete_all_files(confirm): if not confirm: return "โš ๏ธ You must check the 'Confirm' box to delete everything.", gr.update() try: # 1. Get all files in the repo all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") # 2. Filter for files in our managed folders files_to_delete = [f for f in all_files if f.startswith("data/") or f.startswith("metadata/")] if not files_to_delete: return "โ„น๏ธ The dataset is already empty.", gr.update(choices=[]) # 3. Use bulk deletion to avoid hundreds of individual API calls # This is much faster for "Delete All" operations = [CommitOperationDelete(path_in_repo=f) for f in files_to_delete] api.create_commit( repo_id=DATASET_REPO_ID, repo_type="dataset", operations=operations, commit_message=f"Admin: Bulk delete of {len(files_to_delete)} files", token=HF_TOKEN ) return f"๐Ÿ’ฅ Success! Deleted {len(files_to_delete)} files. Dataset is now clean.", gr.update(choices=[], value=None) except Exception as e: return f"โŒ Bulk delete failed: {str(e)}", gr.update() def get_stats(): """Helper to calculate stats and label distribution from repository""" try: # List all files once to avoid multiple API calls all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") audio_files = [f for f in all_files if f.startswith("data/") and f.endswith(".wav")] metadata_files = [f for f in all_files if f.startswith("metadata/") and f.endswith(".csv")] print(f"Found {len(audio_files)} audio files and {len(metadata_files)} metadata files in the repository.") # 1. Count Unique Contributors user_indices = set() for f in audio_files: filename = f.split("/")[-1] user_id = filename.split("_")[0] user_indices.add(user_id) # 2. Count Files per Category (Label) category_counts = {label_codes["1"]: 0, label_codes["2"]: 0, label_codes["3"]: 0} for m_file in metadata_files: try: # Download and read the small metadata file file_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=m_file, repo_type="dataset", token=HF_TOKEN) with open(file_path, 'r') as f: content = f.readlines() if len(content) > 1: # The label is the second column in: user_id,label,file_name,timestamp label = content[1].split(",")[1].strip() if label in category_counts: category_counts[label] += 1 else: # Handle cases where label might not match exactly category_counts[label] = category_counts.get(label, 0) + 1 except Exception: print(f"โš ๏ธ Failed to process metadata file: {m_file}") continue # Skip files that fail to download or parse # 3. Format the stats string stats_md = f"### ๐Ÿ“Š Dataset Statistics\n" stats_md += f"**Total Recordings:** {len(audio_files)} \n" stats_md += f"**Unique Contributors:** {len(user_indices)} \n\n" stats_md += "**Category Breakdown:**\n" for cat, count in category_counts.items(): stats_md += f"- **{cat}:** {count} files\n" return audio_files, stats_md except Exception as e: return [], f"โš ๏ธ Error retrieving stats: {str(e)}" def admin_login(user, pwd): pwd_hash = hashlib.sha256(pwd.encode()).hexdigest() if user == ADMIN_USERNAME and pwd_hash == ADMIN_PASSWORD: audio_files, stats_text = get_stats() return gr.update(visible=True), gr.update(choices=audio_files), "๐Ÿ”“ Admin Authenticated", stats_text return gr.update(visible=False), gr.update(choices=[]), "โŒ Invalid Credentials", "" def delete_selected_file(file_path): if not file_path: return "โš ๏ธ Select a file.", gr.update() try: api.delete_file(path_in_repo=file_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN) meta_path = file_path.replace("data/", "metadata/meta_").replace(".wav", ".csv") try: api.delete_file(path_in_repo=meta_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN) except: pass audio_files, stats_text = get_stats() return f"๐Ÿ—‘๏ธ Deleted {file_path}. {stats_text}", gr.update(choices=audio_files, value=None) except Exception as e: return f"โŒ Error: {str(e)}", gr.update() def access_dataset_zip(email, coupons_str): if not email or not coupons_str: return None, "โš ๏ธ Please provide your email and coupons." coupons_list = [c.strip().upper() for c in coupons_str.split(",") if c.strip()] num_coupons = len(coupons_list) if num_coupons == 0: return None, "โš ๏ธ No valid coupons provided." try: all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") meta_files = [f for f in all_files if f.startswith("metadata/")] tmp_dir = tempfile.mkdtemp() zip_path = os.path.join(tmp_dir, f"ENSIM_Data_Collection.zip") # This list will hold rows for our single combined CSV compiled_metadata = [] with zipfile.ZipFile(zip_path, 'w') as zipf: for m_file in meta_files: local_meta = hf_hub_download(repo_id=DATASET_REPO_ID, filename=m_file, repo_type="dataset", token=HF_TOKEN) df = pd.read_csv(local_meta) row = df.iloc[0] order = int(row['order']) audio_filename = row['file_name'] audio_repo_path = f"data/{audio_filename}" is_training = order % 2 != 0 # --- ACCESS LOGIC --- # 1. Training files (Odd): include only if within coupon count if is_training and order <= num_coupons: audio_local = hf_hub_download(repo_id=DATASET_REPO_ID, filename=audio_repo_path, repo_type="dataset", token=HF_TOKEN) zipf.write(audio_local, arcname=f"training_set/{audio_filename}") # Add to the compiled metadata list compiled_metadata.append({ "wav_filename": audio_filename, "label": row['label'] }) # 2. Test files (Even): Always included (Labels omitted from compiled CSV) elif not is_training: audio_local = hf_hub_download(repo_id=DATASET_REPO_ID, filename=audio_repo_path, repo_type="dataset", token=HF_TOKEN) zipf.write(audio_local, arcname=f"test_set/{audio_filename}") # Add to compiled metadata but set label to HIDDEN or empty compiled_metadata.append({ "wav_filename": audio_filename, "label": "HIDDEN" }) # --- CREATE THE SINGLE CONSOLIDATED CSV --- if compiled_metadata: master_df = pd.DataFrame(compiled_metadata) master_csv_path = os.path.join(tmp_dir, "metadata_summary.csv") # Save only the columns requested master_df.to_csv(master_csv_path, index=False, columns=["wav_filename", "label"]) # Place it at the root of the ZIP for easy access zipf.write(master_csv_path, arcname="metadata_summary.csv") return zip_path, f"โœ… ZIP created with {len(compiled_metadata)} total references." except Exception as e: return None, f"โŒ Error: {str(e)}" # except Exception as e: # return None, f"โŒ Error creating ZIP: {str(e)}" # except Exception as e: return f"โŒ Error: {str(e)}" # --- UI --- with gr.Blocks() as demo: gr.Markdown("# ๐ŸŽ™๏ธ Sound Data Platform") with gr.Tabs(): # STUDENT TAB with gr.TabItem("Dataset Collection"): with gr.Row(): email_input = gr.Textbox(label="Email", placeholder="test") login_btn = gr.Button("Verify", variant="primary") login_status = gr.Markdown("Waiting for login...") with gr.Column(visible=False) as recording_zone: label_input = gr.Radio(choices=[label_codes["1"], label_codes["2"], label_codes["3"]], label="Category") audio_input = gr.Audio(label="Record (40s)", sources=["microphone"], type="filepath") submit_btn = gr.Button("๐Ÿš€ Submit", variant="primary") res_msg = gr.Textbox(label="Status", interactive=False) coupon_display = gr.Textbox(label="๐ŸŽŸ๏ธ YOUR COUPON (Save this!)", interactive=False) # 2. DATASET ACCESS TAB with gr.TabItem("Dataset Access"): gr.Markdown(""" ### ๐Ÿ”“ Unlock Your Data Partition - **Training Data:** You receive Training samples (Audio + Label) proportional to your coupons. - **Test Data:** You receive the full global Test set (Audio Only) to evaluate your models. """) acc_email = gr.Textbox(label="Email") coupons_input = gr.Textbox(label="Coupons List (comma separated)", placeholder="C1, C2, C3...") download_btn = gr.Button("๐Ÿ“ฆ Generate Data ZIP", variant="primary") status_out = gr.Textbox(label="Status") file_out = gr.File(label="Download Your Data") # ADMIN TAB with gr.TabItem("Administration"): with gr.Row(): admin_user = gr.Textbox(label="Admin Username") admin_pass = gr.Textbox(label="Admin Password", type="password") admin_login_btn = gr.Button("Login Admin") admin_msg = gr.Markdown("Log in to manage files.") # This will show the statistics admin_stats_display = gr.Markdown("") with gr.Column(visible=False) as admin_panel: file_dropdown = gr.Dropdown(label="Select File to Remove", choices=[]) delete_btn = gr.Button("๐Ÿ—‘๏ธ Delete Selected File", variant="stop") delete_status = gr.Textbox(label="Delete Progress") gr.Markdown("### ๐Ÿงจ Danger Zone") confirm_check = gr.Checkbox(label="I understand this will permanently delete ALL recordings and metadata.") delete_all_btn = gr.Button("๐Ÿ”ฅ DELETE ALL DATASET FILES", variant="stop") delete_status = gr.Textbox(label="Status Log") # --- EVENT HANDLERS --- login_btn.click(verify_user, [email_input], [recording_zone, login_status]) submit_btn.click( fn=upload_data, inputs=[email_input, label_input, audio_input], outputs=[res_msg, audio_input, label_input, coupon_display] ) admin_login_btn.click( admin_login, [admin_user, admin_pass], [admin_panel, file_dropdown, admin_msg, admin_stats_display] ) delete_btn.click( delete_selected_file, [file_dropdown], [delete_status, file_dropdown] ) download_btn.click( fn=access_dataset_zip, inputs=[acc_email, coupons_input], outputs=[file_out, status_out] ) delete_all_btn.click( fn=delete_all_files, inputs=[confirm_check], outputs=[delete_status, file_dropdown] ) if __name__ == "__main__": demo.launch(theme=gr.themes.Soft())