sound-broken / collect_data.py
mitvho09's picture
Upload Space app
edb671a verified
Raw
History Blame Contribute Delete
10.1 kB
"""Collect real appliance fault audio from YouTube.
Searches for specific fault types, downloads audio, extracts 10-second clips,
and saves them with labels for training.
Usage:
python collect_data.py # Run full collection
python collect_data.py --dry-run # Preview search results only
python collect_data.py --max-videos 5 # Limit videos per query
"""
import os
import json
import subprocess
import sys
from pathlib import Path
from datetime import datetime
# ffmpeg path
FFMPEG = r"C:\Users\sushr\AppData\Roaming\Python\Python313\site-packages\imageio_ffmpeg\binaries\ffmpeg-win-x86_64-v7.1.exe"
DATA_DIR = Path(__file__).parent / "data"
RAW_DIR = DATA_DIR / "raw"
CLIPS_DIR = DATA_DIR / "clips"
LABELS_FILE = DATA_DIR / "labels" / "labels.jsonl"
# Search queries: (fault_type, appliance, search_query, urgency)
QUERIES = [
# Washing machine faults
("bearing_failure", "Washing machine", "washing machine bearing noise grinding", "HIGH"),
("belt_slip", "Washing machine", "washing machine belt squeal noise", "MEDIUM"),
("load_imbalance", "Washing machine", "washing machine unbalanced load vibration", "LOW"),
("foreign_object", "Washing machine", "washing machine object stuck rattling noise", "MEDIUM"),
("pump_failure", "Washing machine", "washing machine drain pump noise grinding", "MEDIUM"),
("motor_failure", "Washing machine", "washing machine motor noise humming whining", "HIGH"),
# Electric fan faults
("blade_imbalance", "Electric fan", "electric fan wobbling noise unbalanced blade", "MEDIUM"),
("bearing_failure", "Electric fan", "electric fan motor bearing noise grinding", "HIGH"),
("blade_strike", "Electric fan", "electric fan hitting noise blade strike", "HIGH"),
# Refrigerator faults
("compressor_failure", "Refrigerator/Freezer", "refrigerator compressor noise loud grinding", "HIGH"),
("evaporator_fan", "Refrigerator/Freezer", "refrigerator evaporator fan noise buzzing", "MEDIUM"),
("condenser_fan", "Refrigerator/Freezer", "refrigerator condenser fan noise grinding", "MEDIUM"),
# Dishwasher faults
("pump_bearing", "Dishwasher", "dishwasher pump noise grinding bearing", "HIGH"),
("spray_arm", "Dishwasher", "dishwasher spray arm noise hitting", "LOW"),
("drain_pump", "Dishwasher", "dishwasher drain pump noise cavitation", "MEDIUM"),
# Dryer faults
("drum_roller", "Tumble dryer", "dryer thumping noise drum roller worn", "HIGH"),
("belt_slip", "Tumble dryer", "dryer squealing noise belt slipping", "MEDIUM"),
("foreign_object", "Tumble dryer", "dryer rattling noise coins buttons", "LOW"),
# Car engine faults
("rod_knock", "Car engine", "car engine rod knock noise deep knocking", "CRITICAL"),
("belt_squeal", "Car engine", "car serpentine belt squeal noise", "MEDIUM"),
("exhaust_leak", "Car engine", "car exhaust leak tick noise", "MEDIUM"),
# Generic motor faults
("bearing_failure", "Electric motor (generic)", "electric motor bearing noise grinding", "HIGH"),
("brush_arcing", "Electric motor (generic)", "electric motor brush arcing noise sparking", "MEDIUM"),
("electrical_hum", "Electric motor (generic)", "electric motor humming loose lamination", "LOW"),
]
def search_youtube(query, max_videos=3):
"""Search YouTube and return video info."""
import yt_dlp
ydl_opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': True,
'force_generic_extractor': False,
'default_search': 'ytsearch',
}
results = []
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
search_query = f"ytsearch{max_videos}:{query}"
info = ydl.extract_info(search_query, download=False)
if info and 'entries' in info:
for entry in info['entries']:
if entry:
results.append({
'id': entry.get('id'),
'title': entry.get('title'),
'url': f"https://www.youtube.com/watch?v={entry.get('id')}",
'duration': entry.get('duration'),
})
except Exception as e:
print(f" Search error: {e}")
return results
def download_audio(video_url, output_path):
"""Download audio from a YouTube video."""
import yt_dlp
ydl_opts = {
'quiet': True,
'no_warnings': True,
'format': 'bestaudio/best',
'outtmpl': str(output_path) + '.%(ext)s',
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
# Find downloaded file
for ext in ['webm', 'm4a', 'mp3', 'opus', 'wav']:
candidate = output_path.with_suffix(f'.{ext}')
if candidate.exists():
# Convert to wav using our ffmpeg
wav_path = output_path.with_suffix('.wav')
cmd = [FFMPEG, "-y", "-i", str(candidate),
"-ar", "22050", "-ac", "1", str(wav_path)]
subprocess.run(cmd, capture_output=True, timeout=30)
candidate.unlink(missing_ok=True)
if wav_path.exists():
return True
return False
except Exception as e:
print(f" Download error: {e}")
return False
def extract_clip(audio_path, output_path, start_sec=0, duration_sec=10):
"""Extract a clip from an audio file using ffmpeg."""
cmd = [
FFMPEG, "-y",
"-i", str(audio_path),
"-ss", str(start_sec),
"-t", str(duration_sec),
"-ar", "22050",
"-ac", "1",
str(output_path)
]
try:
subprocess.run(cmd, capture_output=True, timeout=30)
return output_path.exists()
except Exception as e:
print(f" FFmpeg error: {e}")
return False
def collect_fault_audio(dry_run=False, max_videos=3):
"""Main collection pipeline."""
labels = []
total_clips = 0
print(f"Collecting {len(QUERIES)} fault types, max {max_videos} videos each")
print(f"Output: {CLIPS_DIR}")
print()
for i, (fault_type, appliance, query, urgency) in enumerate(QUERIES):
print(f"[{i+1}/{len(QUERIES)}] {fault_type} ({appliance})")
print(f" Query: {query}")
videos = search_youtube(query, max_videos)
print(f" Found: {len(videos)} videos")
if dry_run:
for v in videos:
print(f" {v['title'][:60]} ({v['duration']}s)")
continue
for v in videos:
print(f" Downloading: {v['title'][:50]}...")
# Download full audio
raw_path = RAW_DIR / f"{v['id']}"
if not download_audio(v['url'], raw_path):
continue
# Find the downloaded file (may have different extension)
raw_file = None
for ext in ['.wav', '.mp3', '.m4a', '.webm']:
candidate = raw_path.with_suffix(ext)
if candidate.exists():
raw_file = candidate
break
if not raw_file:
print(f" No audio file found for {v['id']}")
continue
# Extract 10-second clips at different start points
duration = v.get('duration', 60) or 60
clip_starts = [5, 15, 30, 45] # Skip intros
clips_made = 0
for start in clip_starts:
if start + 10 > duration:
continue
clip_name = f"{v['id']}_{start:03d}.wav"
clip_path = CLIPS_DIR / clip_name
if extract_clip(raw_file, clip_path, start, 10):
label = {
"file": clip_name,
"fault_type": fault_type,
"appliance": appliance,
"urgency": urgency,
"source": v['url'],
"source_title": v['title'],
"start_sec": start,
"collected_at": datetime.now().isoformat(),
}
labels.append(label)
total_clips += 1
clips_made += 1
if clips_made >= 2: # Max 2 clips per video
break
print(f" Extracted {clips_made} clips")
# Clean up raw file
try:
raw_file.unlink()
except:
pass
# Save labels after each query
save_labels(labels)
print(f"\n{'='*60}")
print(f"Collection complete!")
print(f"Total clips: {total_clips}")
print(f"Labels: {LABELS_FILE}")
# Print summary
from collections import Counter
fault_counts = Counter(l['fault_type'] for l in labels)
print(f"\nClips per fault type:")
for fault, count in fault_counts.most_common():
print(f" {fault}: {count}")
return labels
def save_labels(labels):
"""Save labels to JSONL file."""
LABELS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(LABELS_FILE, 'w') as f:
for label in labels:
f.write(json.dumps(label) + '\n')
def load_labels():
"""Load labels from JSONL file."""
if not LABELS_FILE.exists():
return []
labels = []
with open(LABELS_FILE) as f:
for line in f:
if line.strip():
labels.append(json.loads(line))
return labels
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Collect appliance fault audio from YouTube")
parser.add_argument("--dry-run", action="store_true", help="Preview search results only")
parser.add_argument("--max-videos", type=int, default=3, help="Max videos per query")
args = parser.parse_args()
collect_fault_audio(dry_run=args.dry_run, max_videos=args.max_videos)