"""Collect real appliance fault audio from YouTube. Searches for specific fault types, downloads audio, extracts 10-second clips, and saves them with labels for training. Usage: python collect_data.py # Run full collection python collect_data.py --dry-run # Preview search results only python collect_data.py --max-videos 5 # Limit videos per query """ import os import json import subprocess import sys from pathlib import Path from datetime import datetime # ffmpeg path FFMPEG = r"C:\Users\sushr\AppData\Roaming\Python\Python313\site-packages\imageio_ffmpeg\binaries\ffmpeg-win-x86_64-v7.1.exe" DATA_DIR = Path(__file__).parent / "data" RAW_DIR = DATA_DIR / "raw" CLIPS_DIR = DATA_DIR / "clips" LABELS_FILE = DATA_DIR / "labels" / "labels.jsonl" # Search queries: (fault_type, appliance, search_query, urgency) QUERIES = [ # Washing machine faults ("bearing_failure", "Washing machine", "washing machine bearing noise grinding", "HIGH"), ("belt_slip", "Washing machine", "washing machine belt squeal noise", "MEDIUM"), ("load_imbalance", "Washing machine", "washing machine unbalanced load vibration", "LOW"), ("foreign_object", "Washing machine", "washing machine object stuck rattling noise", "MEDIUM"), ("pump_failure", "Washing machine", "washing machine drain pump noise grinding", "MEDIUM"), ("motor_failure", "Washing machine", "washing machine motor noise humming whining", "HIGH"), # Electric fan faults ("blade_imbalance", "Electric fan", "electric fan wobbling noise unbalanced blade", "MEDIUM"), ("bearing_failure", "Electric fan", "electric fan motor bearing noise grinding", "HIGH"), ("blade_strike", "Electric fan", "electric fan hitting noise blade strike", "HIGH"), # Refrigerator faults ("compressor_failure", "Refrigerator/Freezer", "refrigerator compressor noise loud grinding", "HIGH"), ("evaporator_fan", "Refrigerator/Freezer", "refrigerator evaporator fan noise buzzing", "MEDIUM"), ("condenser_fan", "Refrigerator/Freezer", "refrigerator condenser fan noise grinding", "MEDIUM"), # Dishwasher faults ("pump_bearing", "Dishwasher", "dishwasher pump noise grinding bearing", "HIGH"), ("spray_arm", "Dishwasher", "dishwasher spray arm noise hitting", "LOW"), ("drain_pump", "Dishwasher", "dishwasher drain pump noise cavitation", "MEDIUM"), # Dryer faults ("drum_roller", "Tumble dryer", "dryer thumping noise drum roller worn", "HIGH"), ("belt_slip", "Tumble dryer", "dryer squealing noise belt slipping", "MEDIUM"), ("foreign_object", "Tumble dryer", "dryer rattling noise coins buttons", "LOW"), # Car engine faults ("rod_knock", "Car engine", "car engine rod knock noise deep knocking", "CRITICAL"), ("belt_squeal", "Car engine", "car serpentine belt squeal noise", "MEDIUM"), ("exhaust_leak", "Car engine", "car exhaust leak tick noise", "MEDIUM"), # Generic motor faults ("bearing_failure", "Electric motor (generic)", "electric motor bearing noise grinding", "HIGH"), ("brush_arcing", "Electric motor (generic)", "electric motor brush arcing noise sparking", "MEDIUM"), ("electrical_hum", "Electric motor (generic)", "electric motor humming loose lamination", "LOW"), ] def search_youtube(query, max_videos=3): """Search YouTube and return video info.""" import yt_dlp ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': True, 'force_generic_extractor': False, 'default_search': 'ytsearch', } results = [] try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: search_query = f"ytsearch{max_videos}:{query}" info = ydl.extract_info(search_query, download=False) if info and 'entries' in info: for entry in info['entries']: if entry: results.append({ 'id': entry.get('id'), 'title': entry.get('title'), 'url': f"https://www.youtube.com/watch?v={entry.get('id')}", 'duration': entry.get('duration'), }) except Exception as e: print(f" Search error: {e}") return results def download_audio(video_url, output_path): """Download audio from a YouTube video.""" import yt_dlp ydl_opts = { 'quiet': True, 'no_warnings': True, 'format': 'bestaudio/best', 'outtmpl': str(output_path) + '.%(ext)s', } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=True) # Find downloaded file for ext in ['webm', 'm4a', 'mp3', 'opus', 'wav']: candidate = output_path.with_suffix(f'.{ext}') if candidate.exists(): # Convert to wav using our ffmpeg wav_path = output_path.with_suffix('.wav') cmd = [FFMPEG, "-y", "-i", str(candidate), "-ar", "22050", "-ac", "1", str(wav_path)] subprocess.run(cmd, capture_output=True, timeout=30) candidate.unlink(missing_ok=True) if wav_path.exists(): return True return False except Exception as e: print(f" Download error: {e}") return False def extract_clip(audio_path, output_path, start_sec=0, duration_sec=10): """Extract a clip from an audio file using ffmpeg.""" cmd = [ FFMPEG, "-y", "-i", str(audio_path), "-ss", str(start_sec), "-t", str(duration_sec), "-ar", "22050", "-ac", "1", str(output_path) ] try: subprocess.run(cmd, capture_output=True, timeout=30) return output_path.exists() except Exception as e: print(f" FFmpeg error: {e}") return False def collect_fault_audio(dry_run=False, max_videos=3): """Main collection pipeline.""" labels = [] total_clips = 0 print(f"Collecting {len(QUERIES)} fault types, max {max_videos} videos each") print(f"Output: {CLIPS_DIR}") print() for i, (fault_type, appliance, query, urgency) in enumerate(QUERIES): print(f"[{i+1}/{len(QUERIES)}] {fault_type} ({appliance})") print(f" Query: {query}") videos = search_youtube(query, max_videos) print(f" Found: {len(videos)} videos") if dry_run: for v in videos: print(f" {v['title'][:60]} ({v['duration']}s)") continue for v in videos: print(f" Downloading: {v['title'][:50]}...") # Download full audio raw_path = RAW_DIR / f"{v['id']}" if not download_audio(v['url'], raw_path): continue # Find the downloaded file (may have different extension) raw_file = None for ext in ['.wav', '.mp3', '.m4a', '.webm']: candidate = raw_path.with_suffix(ext) if candidate.exists(): raw_file = candidate break if not raw_file: print(f" No audio file found for {v['id']}") continue # Extract 10-second clips at different start points duration = v.get('duration', 60) or 60 clip_starts = [5, 15, 30, 45] # Skip intros clips_made = 0 for start in clip_starts: if start + 10 > duration: continue clip_name = f"{v['id']}_{start:03d}.wav" clip_path = CLIPS_DIR / clip_name if extract_clip(raw_file, clip_path, start, 10): label = { "file": clip_name, "fault_type": fault_type, "appliance": appliance, "urgency": urgency, "source": v['url'], "source_title": v['title'], "start_sec": start, "collected_at": datetime.now().isoformat(), } labels.append(label) total_clips += 1 clips_made += 1 if clips_made >= 2: # Max 2 clips per video break print(f" Extracted {clips_made} clips") # Clean up raw file try: raw_file.unlink() except: pass # Save labels after each query save_labels(labels) print(f"\n{'='*60}") print(f"Collection complete!") print(f"Total clips: {total_clips}") print(f"Labels: {LABELS_FILE}") # Print summary from collections import Counter fault_counts = Counter(l['fault_type'] for l in labels) print(f"\nClips per fault type:") for fault, count in fault_counts.most_common(): print(f" {fault}: {count}") return labels def save_labels(labels): """Save labels to JSONL file.""" LABELS_FILE.parent.mkdir(parents=True, exist_ok=True) with open(LABELS_FILE, 'w') as f: for label in labels: f.write(json.dumps(label) + '\n') def load_labels(): """Load labels from JSONL file.""" if not LABELS_FILE.exists(): return [] labels = [] with open(LABELS_FILE) as f: for line in f: if line.strip(): labels.append(json.loads(line)) return labels if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Collect appliance fault audio from YouTube") parser.add_argument("--dry-run", action="store_true", help="Preview search results only") parser.add_argument("--max-videos", type=int, default=3, help="Max videos per query") args = parser.parse_args() collect_fault_audio(dry_run=args.dry_run, max_videos=args.max_videos)