Spaces:
Runtime error
Runtime error
| """Collect real appliance fault audio from YouTube. | |
| Searches for specific fault types, downloads audio, extracts 10-second clips, | |
| and saves them with labels for training. | |
| Usage: | |
| python collect_data.py # Run full collection | |
| python collect_data.py --dry-run # Preview search results only | |
| python collect_data.py --max-videos 5 # Limit videos per query | |
| """ | |
| import os | |
| import json | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime | |
| # ffmpeg path | |
| FFMPEG = r"C:\Users\sushr\AppData\Roaming\Python\Python313\site-packages\imageio_ffmpeg\binaries\ffmpeg-win-x86_64-v7.1.exe" | |
| DATA_DIR = Path(__file__).parent / "data" | |
| RAW_DIR = DATA_DIR / "raw" | |
| CLIPS_DIR = DATA_DIR / "clips" | |
| LABELS_FILE = DATA_DIR / "labels" / "labels.jsonl" | |
| # Search queries: (fault_type, appliance, search_query, urgency) | |
| QUERIES = [ | |
| # Washing machine faults | |
| ("bearing_failure", "Washing machine", "washing machine bearing noise grinding", "HIGH"), | |
| ("belt_slip", "Washing machine", "washing machine belt squeal noise", "MEDIUM"), | |
| ("load_imbalance", "Washing machine", "washing machine unbalanced load vibration", "LOW"), | |
| ("foreign_object", "Washing machine", "washing machine object stuck rattling noise", "MEDIUM"), | |
| ("pump_failure", "Washing machine", "washing machine drain pump noise grinding", "MEDIUM"), | |
| ("motor_failure", "Washing machine", "washing machine motor noise humming whining", "HIGH"), | |
| # Electric fan faults | |
| ("blade_imbalance", "Electric fan", "electric fan wobbling noise unbalanced blade", "MEDIUM"), | |
| ("bearing_failure", "Electric fan", "electric fan motor bearing noise grinding", "HIGH"), | |
| ("blade_strike", "Electric fan", "electric fan hitting noise blade strike", "HIGH"), | |
| # Refrigerator faults | |
| ("compressor_failure", "Refrigerator/Freezer", "refrigerator compressor noise loud grinding", "HIGH"), | |
| ("evaporator_fan", "Refrigerator/Freezer", "refrigerator evaporator fan noise buzzing", "MEDIUM"), | |
| ("condenser_fan", "Refrigerator/Freezer", "refrigerator condenser fan noise grinding", "MEDIUM"), | |
| # Dishwasher faults | |
| ("pump_bearing", "Dishwasher", "dishwasher pump noise grinding bearing", "HIGH"), | |
| ("spray_arm", "Dishwasher", "dishwasher spray arm noise hitting", "LOW"), | |
| ("drain_pump", "Dishwasher", "dishwasher drain pump noise cavitation", "MEDIUM"), | |
| # Dryer faults | |
| ("drum_roller", "Tumble dryer", "dryer thumping noise drum roller worn", "HIGH"), | |
| ("belt_slip", "Tumble dryer", "dryer squealing noise belt slipping", "MEDIUM"), | |
| ("foreign_object", "Tumble dryer", "dryer rattling noise coins buttons", "LOW"), | |
| # Car engine faults | |
| ("rod_knock", "Car engine", "car engine rod knock noise deep knocking", "CRITICAL"), | |
| ("belt_squeal", "Car engine", "car serpentine belt squeal noise", "MEDIUM"), | |
| ("exhaust_leak", "Car engine", "car exhaust leak tick noise", "MEDIUM"), | |
| # Generic motor faults | |
| ("bearing_failure", "Electric motor (generic)", "electric motor bearing noise grinding", "HIGH"), | |
| ("brush_arcing", "Electric motor (generic)", "electric motor brush arcing noise sparking", "MEDIUM"), | |
| ("electrical_hum", "Electric motor (generic)", "electric motor humming loose lamination", "LOW"), | |
| ] | |
| def search_youtube(query, max_videos=3): | |
| """Search YouTube and return video info.""" | |
| import yt_dlp | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': True, | |
| 'force_generic_extractor': False, | |
| 'default_search': 'ytsearch', | |
| } | |
| results = [] | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| search_query = f"ytsearch{max_videos}:{query}" | |
| info = ydl.extract_info(search_query, download=False) | |
| if info and 'entries' in info: | |
| for entry in info['entries']: | |
| if entry: | |
| results.append({ | |
| 'id': entry.get('id'), | |
| 'title': entry.get('title'), | |
| 'url': f"https://www.youtube.com/watch?v={entry.get('id')}", | |
| 'duration': entry.get('duration'), | |
| }) | |
| except Exception as e: | |
| print(f" Search error: {e}") | |
| return results | |
| def download_audio(video_url, output_path): | |
| """Download audio from a YouTube video.""" | |
| import yt_dlp | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': str(output_path) + '.%(ext)s', | |
| } | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(video_url, download=True) | |
| # Find downloaded file | |
| for ext in ['webm', 'm4a', 'mp3', 'opus', 'wav']: | |
| candidate = output_path.with_suffix(f'.{ext}') | |
| if candidate.exists(): | |
| # Convert to wav using our ffmpeg | |
| wav_path = output_path.with_suffix('.wav') | |
| cmd = [FFMPEG, "-y", "-i", str(candidate), | |
| "-ar", "22050", "-ac", "1", str(wav_path)] | |
| subprocess.run(cmd, capture_output=True, timeout=30) | |
| candidate.unlink(missing_ok=True) | |
| if wav_path.exists(): | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f" Download error: {e}") | |
| return False | |
| def extract_clip(audio_path, output_path, start_sec=0, duration_sec=10): | |
| """Extract a clip from an audio file using ffmpeg.""" | |
| cmd = [ | |
| FFMPEG, "-y", | |
| "-i", str(audio_path), | |
| "-ss", str(start_sec), | |
| "-t", str(duration_sec), | |
| "-ar", "22050", | |
| "-ac", "1", | |
| str(output_path) | |
| ] | |
| try: | |
| subprocess.run(cmd, capture_output=True, timeout=30) | |
| return output_path.exists() | |
| except Exception as e: | |
| print(f" FFmpeg error: {e}") | |
| return False | |
| def collect_fault_audio(dry_run=False, max_videos=3): | |
| """Main collection pipeline.""" | |
| labels = [] | |
| total_clips = 0 | |
| print(f"Collecting {len(QUERIES)} fault types, max {max_videos} videos each") | |
| print(f"Output: {CLIPS_DIR}") | |
| print() | |
| for i, (fault_type, appliance, query, urgency) in enumerate(QUERIES): | |
| print(f"[{i+1}/{len(QUERIES)}] {fault_type} ({appliance})") | |
| print(f" Query: {query}") | |
| videos = search_youtube(query, max_videos) | |
| print(f" Found: {len(videos)} videos") | |
| if dry_run: | |
| for v in videos: | |
| print(f" {v['title'][:60]} ({v['duration']}s)") | |
| continue | |
| for v in videos: | |
| print(f" Downloading: {v['title'][:50]}...") | |
| # Download full audio | |
| raw_path = RAW_DIR / f"{v['id']}" | |
| if not download_audio(v['url'], raw_path): | |
| continue | |
| # Find the downloaded file (may have different extension) | |
| raw_file = None | |
| for ext in ['.wav', '.mp3', '.m4a', '.webm']: | |
| candidate = raw_path.with_suffix(ext) | |
| if candidate.exists(): | |
| raw_file = candidate | |
| break | |
| if not raw_file: | |
| print(f" No audio file found for {v['id']}") | |
| continue | |
| # Extract 10-second clips at different start points | |
| duration = v.get('duration', 60) or 60 | |
| clip_starts = [5, 15, 30, 45] # Skip intros | |
| clips_made = 0 | |
| for start in clip_starts: | |
| if start + 10 > duration: | |
| continue | |
| clip_name = f"{v['id']}_{start:03d}.wav" | |
| clip_path = CLIPS_DIR / clip_name | |
| if extract_clip(raw_file, clip_path, start, 10): | |
| label = { | |
| "file": clip_name, | |
| "fault_type": fault_type, | |
| "appliance": appliance, | |
| "urgency": urgency, | |
| "source": v['url'], | |
| "source_title": v['title'], | |
| "start_sec": start, | |
| "collected_at": datetime.now().isoformat(), | |
| } | |
| labels.append(label) | |
| total_clips += 1 | |
| clips_made += 1 | |
| if clips_made >= 2: # Max 2 clips per video | |
| break | |
| print(f" Extracted {clips_made} clips") | |
| # Clean up raw file | |
| try: | |
| raw_file.unlink() | |
| except: | |
| pass | |
| # Save labels after each query | |
| save_labels(labels) | |
| print(f"\n{'='*60}") | |
| print(f"Collection complete!") | |
| print(f"Total clips: {total_clips}") | |
| print(f"Labels: {LABELS_FILE}") | |
| # Print summary | |
| from collections import Counter | |
| fault_counts = Counter(l['fault_type'] for l in labels) | |
| print(f"\nClips per fault type:") | |
| for fault, count in fault_counts.most_common(): | |
| print(f" {fault}: {count}") | |
| return labels | |
| def save_labels(labels): | |
| """Save labels to JSONL file.""" | |
| LABELS_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with open(LABELS_FILE, 'w') as f: | |
| for label in labels: | |
| f.write(json.dumps(label) + '\n') | |
| def load_labels(): | |
| """Load labels from JSONL file.""" | |
| if not LABELS_FILE.exists(): | |
| return [] | |
| labels = [] | |
| with open(LABELS_FILE) as f: | |
| for line in f: | |
| if line.strip(): | |
| labels.append(json.loads(line)) | |
| return labels | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Collect appliance fault audio from YouTube") | |
| parser.add_argument("--dry-run", action="store_true", help="Preview search results only") | |
| parser.add_argument("--max-videos", type=int, default=3, help="Max videos per query") | |
| args = parser.parse_args() | |
| collect_fault_audio(dry_run=args.dry_run, max_videos=args.max_videos) | |