Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import hashlib | |
| import pandas as pd | |
| import zipfile | |
| import tempfile | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, hf_hub_download,CommitOperationDelete | |
| from pathlib import Path | |
| import librosa | |
| import soundfile as sf | |
| import tempfile | |
| import numpy as np | |
| label_codes = { | |
| "1":"Engine", | |
| "2":"Environmental", | |
| "3":"Mechanical" | |
| } | |
| label_decoder = {v: k for k, v in label_codes.items()} | |
| # --- CONFIGURATION --- | |
| DATASET_REPO_ID = "MeysamSh/ENSIMSoundDataCollection" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| COUPON_SALT = os.environ.get("COUPON_SALT") | |
| # Admin Credentials | |
| ADMIN_USERNAME = "admin" | |
| ADMIN_PASSWORD = "30c8663d3ca10ededd17ac1b55f3d533ab29cf1b8470b1729af09afda3f0a516" | |
| AUTHORIZED_USERS = [ | |
| "5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8", | |
| "test" | |
| ] | |
| api = HfApi() | |
| # --- LOGIC FUNCTIONS --- | |
| def generate_coupon(filename): | |
| """Creates a unique string for the student to save.""" | |
| return hashlib.sha1(f"{filename}{COUPON_SALT}".encode()).hexdigest()[:10].upper() | |
| def verify_user(email): | |
| if not email: return gr.update(visible=False), "β οΈ Enter email." | |
| clean_email = email.strip().lower() | |
| email_hash = hashlib.sha256(clean_email.encode()).hexdigest() | |
| if clean_email in AUTHORIZED_USERS or email_hash in AUTHORIZED_USERS: | |
| return gr.update(visible=True), f"β Access Granted: {clean_email}" | |
| return gr.update(visible=False), "π« Not authorized." | |
| def upload_data(email, label, audio_path): | |
| # --- Energy Threshold Setting --- | |
| ENERGY_THRESHOLD = 0.02 # Adjust this: 0.01 is very sensitive, 0.05 is strict | |
| if audio_path is None: | |
| return "β οΈ Please record or upload a sound file.", None, gr.update(), "" | |
| if not label: | |
| return "β οΈ Please select a category label.", gr.update(), gr.update(), "" | |
| try: | |
| y, sr = librosa.load(audio_path, sr=None) | |
| duration = librosa.get_duration(y=y, sr=sr) | |
| if duration < 2.0: | |
| return f"β οΈ Sound too short ({duration:.1f}s).", gr.update(), gr.update(), "" | |
| raw_segments = [] | |
| # --- SPLITTING LOGIC --- | |
| if duration < 5.0: | |
| raw_segments.append(y[:int(2 * sr)]) | |
| elif duration >= 7.0: | |
| start_sample = int(3 * sr) | |
| remaining_audio = y[start_sample:] | |
| window_size = int(2 * sr) | |
| for i in range(0, len(remaining_audio) - window_size + 1, window_size): | |
| raw_segments.append(remaining_audio[i : i + window_size]) | |
| else: | |
| raw_segments.append(y[:int(2 * sr)]) | |
| # --- ENERGY CALCULATION & FILTERING --- | |
| valid_segments = [] | |
| rejected_count = 0 | |
| for seg in raw_segments: | |
| # Calculate RMS energy: sqrt(mean(x^2)) | |
| rms = np.sqrt(np.mean(seg**2)) | |
| if rms >= ENERGY_THRESHOLD: | |
| valid_segments.append(seg) | |
| else: | |
| rejected_count += 1 | |
| if not valid_segments: | |
| return f"β Rejected: {rejected_count} segments were too quiet. Please record closer to the source.", None, gr.update(), "" | |
| # --- UPLOAD PROCESS --- | |
| clean_email = email.strip().lower() | |
| email_index = AUTHORIZED_USERS.index(clean_email) if clean_email in AUTHORIZED_USERS else "unknown" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| coupons = [] | |
| for idx, seg in enumerate(valid_segments): | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_seg: | |
| sf.write(tmp_seg.name, seg, sr) | |
| seg_filename = f"{email_index}_{timestamp}_seg{idx}.wav" | |
| coupon = generate_coupon(seg_filename) | |
| coupons.append(coupon) | |
| api.upload_file( | |
| path_or_fileobj=tmp_seg.name, | |
| path_in_repo=f"data/{seg_filename}", | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| meta_content = f"user_id,label,file_name,time,order\n{clean_email},{label},{seg_filename},{timestamp},{idx+1}" | |
| api.upload_file( | |
| path_or_fileobj=meta_content.encode(), | |
| path_in_repo=f"metadata/meta_{email_index}_{timestamp}_seg{idx}.csv", | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| os.unlink(tmp_seg.name) | |
| status_msg = f"π Success! {len(valid_segments)} samples accepted." | |
| if rejected_count > 0: | |
| status_msg += f" ({rejected_count} quiet segments discarded)." | |
| return status_msg, None, gr.update(value=None), ", ".join(coupons) | |
| except Exception as e: | |
| return f"β Error: {str(e)}", gr.update(), gr.update(), "" | |
| # --- ADMIN LOGIC --- | |
| def delete_all_files(confirm): | |
| if not confirm: | |
| return "β οΈ You must check the 'Confirm' box to delete everything.", gr.update() | |
| try: | |
| # 1. Get all files in the repo | |
| all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") | |
| # 2. Filter for files in our managed folders | |
| files_to_delete = [f for f in all_files if f.startswith("data/") or f.startswith("metadata/")] | |
| if not files_to_delete: | |
| return "βΉοΈ The dataset is already empty.", gr.update(choices=[]) | |
| # 3. Use bulk deletion to avoid hundreds of individual API calls | |
| # This is much faster for "Delete All" | |
| operations = [CommitOperationDelete(path_in_repo=f) for f in files_to_delete] | |
| api.create_commit( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| operations=operations, | |
| commit_message=f"Admin: Bulk delete of {len(files_to_delete)} files", | |
| token=HF_TOKEN | |
| ) | |
| return f"π₯ Success! Deleted {len(files_to_delete)} files. Dataset is now clean.", gr.update(choices=[], value=None) | |
| except Exception as e: | |
| return f"β Bulk delete failed: {str(e)}", gr.update() | |
| def get_stats(): | |
| """Helper to calculate stats and label distribution from repository""" | |
| try: | |
| # List all files once to avoid multiple API calls | |
| all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") | |
| audio_files = [f for f in all_files if f.startswith("data/") and f.endswith(".wav")] | |
| metadata_files = [f for f in all_files if f.startswith("metadata/") and f.endswith(".csv")] | |
| print(f"Found {len(audio_files)} audio files and {len(metadata_files)} metadata files in the repository.") | |
| # 1. Count Unique Contributors | |
| user_indices = set() | |
| for f in audio_files: | |
| filename = f.split("/")[-1] | |
| user_id = filename.split("_")[0] | |
| user_indices.add(user_id) | |
| # 2. Count Files per Category (Label) | |
| category_counts = {label_codes["1"]: 0, label_codes["2"]: 0, label_codes["3"]: 0} | |
| for m_file in metadata_files: | |
| try: | |
| # Download and read the small metadata file | |
| file_path = hf_hub_download(repo_id=DATASET_REPO_ID, filename=m_file, repo_type="dataset", token=HF_TOKEN) | |
| with open(file_path, 'r') as f: | |
| content = f.readlines() | |
| if len(content) > 1: | |
| # The label is the second column in: user_id,label,file_name,timestamp | |
| label = content[1].split(",")[1].strip() | |
| if label in category_counts: | |
| category_counts[label] += 1 | |
| else: | |
| # Handle cases where label might not match exactly | |
| category_counts[label] = category_counts.get(label, 0) + 1 | |
| except Exception: | |
| print(f"β οΈ Failed to process metadata file: {m_file}") | |
| continue # Skip files that fail to download or parse | |
| # 3. Format the stats string | |
| stats_md = f"### π Dataset Statistics\n" | |
| stats_md += f"**Total Recordings:** {len(audio_files)} \n" | |
| stats_md += f"**Unique Contributors:** {len(user_indices)} \n\n" | |
| stats_md += "**Category Breakdown:**\n" | |
| for cat, count in category_counts.items(): | |
| stats_md += f"- **{cat}:** {count} files\n" | |
| return audio_files, stats_md | |
| except Exception as e: | |
| return [], f"β οΈ Error retrieving stats: {str(e)}" | |
| def admin_login(user, pwd): | |
| pwd_hash = hashlib.sha256(pwd.encode()).hexdigest() | |
| if user == ADMIN_USERNAME and pwd_hash == ADMIN_PASSWORD: | |
| audio_files, stats_text = get_stats() | |
| return gr.update(visible=True), gr.update(choices=audio_files), "π Admin Authenticated", stats_text | |
| return gr.update(visible=False), gr.update(choices=[]), "β Invalid Credentials", "" | |
| def delete_selected_file(file_path): | |
| if not file_path: return "β οΈ Select a file.", gr.update() | |
| try: | |
| api.delete_file(path_in_repo=file_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
| meta_path = file_path.replace("data/", "metadata/meta_").replace(".wav", ".csv") | |
| try: | |
| api.delete_file(path_in_repo=meta_path, repo_id=DATASET_REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
| except: pass | |
| audio_files, stats_text = get_stats() | |
| return f"ποΈ Deleted {file_path}. {stats_text}", gr.update(choices=audio_files, value=None) | |
| except Exception as e: return f"β Error: {str(e)}", gr.update() | |
| def access_dataset_zip(email, coupons_str): | |
| if not email or not coupons_str: | |
| return None, "β οΈ Please provide your email and coupons." | |
| coupons_list = [c.strip().upper() for c in coupons_str.split(",") if c.strip()] | |
| num_coupons = len(coupons_list) | |
| if num_coupons == 0: | |
| return None, "β οΈ No valid coupons provided." | |
| try: | |
| all_files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") | |
| meta_files = [f for f in all_files if f.startswith("metadata/")] | |
| tmp_dir = tempfile.mkdtemp() | |
| zip_path = os.path.join(tmp_dir, f"ENSIM_Data_Collection.zip") | |
| # This list will hold rows for our single combined CSV | |
| compiled_metadata = [] | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| for m_file in meta_files: | |
| local_meta = hf_hub_download(repo_id=DATASET_REPO_ID, filename=m_file, repo_type="dataset", token=HF_TOKEN) | |
| df = pd.read_csv(local_meta) | |
| row = df.iloc[0] | |
| order = int(row['order']) | |
| audio_filename = row['file_name'] | |
| audio_repo_path = f"data/{audio_filename}" | |
| is_training = order % 2 != 0 | |
| # --- ACCESS LOGIC --- | |
| # 1. Training files (Odd): include only if within coupon count | |
| if is_training and order <= num_coupons: | |
| audio_local = hf_hub_download(repo_id=DATASET_REPO_ID, filename=audio_repo_path, repo_type="dataset", token=HF_TOKEN) | |
| zipf.write(audio_local, arcname=f"training_set/{audio_filename}") | |
| # Add to the compiled metadata list | |
| compiled_metadata.append({ | |
| "wav_filename": audio_filename, | |
| "label": row['label'] | |
| }) | |
| # 2. Test files (Even): Always included (Labels omitted from compiled CSV) | |
| elif not is_training: | |
| audio_local = hf_hub_download(repo_id=DATASET_REPO_ID, filename=audio_repo_path, repo_type="dataset", token=HF_TOKEN) | |
| zipf.write(audio_local, arcname=f"test_set/{audio_filename}") | |
| # Add to compiled metadata but set label to HIDDEN or empty | |
| compiled_metadata.append({ | |
| "wav_filename": audio_filename, | |
| "label": "HIDDEN" | |
| }) | |
| # --- CREATE THE SINGLE CONSOLIDATED CSV --- | |
| if compiled_metadata: | |
| master_df = pd.DataFrame(compiled_metadata) | |
| master_csv_path = os.path.join(tmp_dir, "metadata_summary.csv") | |
| # Save only the columns requested | |
| master_df.to_csv(master_csv_path, index=False, columns=["wav_filename", "label"]) | |
| # Place it at the root of the ZIP for easy access | |
| zipf.write(master_csv_path, arcname="metadata_summary.csv") | |
| return zip_path, f"β ZIP created with {len(compiled_metadata)} total references." | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| # except Exception as e: | |
| # return None, f"β Error creating ZIP: {str(e)}" | |
| # except Exception as e: return f"β Error: {str(e)}" | |
| # --- UI --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# ποΈ Sound Data Platform") | |
| with gr.Tabs(): | |
| # STUDENT TAB | |
| with gr.TabItem("Dataset Collection"): | |
| with gr.Row(): | |
| email_input = gr.Textbox(label="Email", placeholder="test") | |
| login_btn = gr.Button("Verify", variant="primary") | |
| login_status = gr.Markdown("Waiting for login...") | |
| with gr.Column(visible=False) as recording_zone: | |
| label_input = gr.Radio(choices=[label_codes["1"], label_codes["2"], label_codes["3"]], label="Category") | |
| audio_input = gr.Audio(label="Record (40s)", sources=["microphone"], type="filepath") | |
| submit_btn = gr.Button("π Submit", variant="primary") | |
| res_msg = gr.Textbox(label="Status", interactive=False) | |
| coupon_display = gr.Textbox(label="ποΈ YOUR COUPON (Save this!)", interactive=False) | |
| # 2. DATASET ACCESS TAB | |
| with gr.TabItem("Dataset Access"): | |
| gr.Markdown(""" | |
| ### π Unlock Your Data Partition | |
| - **Training Data:** You receive Training samples (Audio + Label) proportional to your coupons. | |
| - **Test Data:** You receive the full global Test set (Audio Only) to evaluate your models. | |
| """) | |
| acc_email = gr.Textbox(label="Email") | |
| coupons_input = gr.Textbox(label="Coupons List (comma separated)", placeholder="C1, C2, C3...") | |
| download_btn = gr.Button("π¦ Generate Data ZIP", variant="primary") | |
| status_out = gr.Textbox(label="Status") | |
| file_out = gr.File(label="Download Your Data") | |
| # ADMIN TAB | |
| with gr.TabItem("Administration"): | |
| with gr.Row(): | |
| admin_user = gr.Textbox(label="Admin Username") | |
| admin_pass = gr.Textbox(label="Admin Password", type="password") | |
| admin_login_btn = gr.Button("Login Admin") | |
| admin_msg = gr.Markdown("Log in to manage files.") | |
| # This will show the statistics | |
| admin_stats_display = gr.Markdown("") | |
| with gr.Column(visible=False) as admin_panel: | |
| file_dropdown = gr.Dropdown(label="Select File to Remove", choices=[]) | |
| delete_btn = gr.Button("ποΈ Delete Selected File", variant="stop") | |
| delete_status = gr.Textbox(label="Delete Progress") | |
| gr.Markdown("### 𧨠Danger Zone") | |
| confirm_check = gr.Checkbox(label="I understand this will permanently delete ALL recordings and metadata.") | |
| delete_all_btn = gr.Button("π₯ DELETE ALL DATASET FILES", variant="stop") | |
| delete_status = gr.Textbox(label="Status Log") | |
| # --- EVENT HANDLERS --- | |
| login_btn.click(verify_user, [email_input], [recording_zone, login_status]) | |
| submit_btn.click( | |
| fn=upload_data, | |
| inputs=[email_input, label_input, audio_input], | |
| outputs=[res_msg, audio_input, label_input, coupon_display] | |
| ) | |
| admin_login_btn.click( | |
| admin_login, | |
| [admin_user, admin_pass], | |
| [admin_panel, file_dropdown, admin_msg, admin_stats_display] | |
| ) | |
| delete_btn.click( | |
| delete_selected_file, | |
| [file_dropdown], | |
| [delete_status, file_dropdown] | |
| ) | |
| download_btn.click( | |
| fn=access_dataset_zip, | |
| inputs=[acc_email, coupons_input], | |
| outputs=[file_out, status_out] | |
| ) | |
| delete_all_btn.click( | |
| fn=delete_all_files, | |
| inputs=[confirm_check], | |
| outputs=[delete_status, file_dropdown] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(theme=gr.themes.Soft()) |