waltgrace's picture
Initial release: data-label-factory v0.1.0
4cda727 verified
#!/usr/bin/env python3
"""
gather_v2.py — smarter, parallel image gatherer for the drone-falcon dataset.
Improvements over gather_images.py (v1):
- Parallel downloads (50 threads instead of sequential)
- YouTube frame extraction via yt-dlp + ffmpeg (the killer feature for combat footage)
- Optional inline Qwen filter — only saves images Qwen says YES to
- Perceptual-hash dedup across sources (catches the same image from different sites)
- Resumable via local manifest
Sources:
- DuckDuckGo image search (broad, noisy)
- Wikimedia Commons (CC, niche, slower)
- YouTube videos / playlists (gold for combat footage)
Outputs:
drone-dataset-v2/<bucket>/<file>.jpg ← local mirror
drone-dataset-v2/manifest.json ← every file with provenance
Usage:
# Web search only (DDG + Wikimedia)
python3 gather_v2.py --bucket positive/fiber_spool_drone \\
--query "fiber optic drone Ukraine" --query "tethered fpv drone" \\
--max-per-query 100
# YouTube frame extraction
python3 gather_v2.py --bucket positive/fiber_spool_drone \\
--youtube "https://youtube.com/playlist?list=ABC123" \\
--fps 1 --max-frames-per-video 200
# Inline Qwen filter (only saves YES images)
python3 gather_v2.py --bucket positive/fiber_spool_drone \\
--query "fiber optic drone" --filter
"""
import argparse
import base64
import hashlib
import io
import json
import os
import shutil
import subprocess
import time
import urllib.request
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from PIL import Image
# ============================================================
# CONFIG
# ============================================================
USER_AGENT = "data-label-factory-gather/0.1 (research project)"
# Override via env vars (same as the rest of the factory CLI)
M4_QWEN_URL = os.environ.get("QWEN_URL", "http://localhost:8291")
QWEN_MODEL_PATH = os.environ.get(
"QWEN_MODEL_PATH", "mlx-community/Qwen2.5-VL-3B-Instruct-4bit"
)
QWEN_FILTER_PROMPT = (
"Look at this image. Does it show a drone, a cable spool, or a wound fiber optic cable?\n"
"Answer with exactly one word: YES or NO.\n"
"YES if you see ANY of: a drone, a quadcopter, a cable reel, a fiber spool, a wound cable.\n"
"NO if the main subject is something else."
)
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"}
# ============================================================
# DUCKDUCKGO IMAGE SEARCH (no API key)
# ============================================================
def ddg_search(query: str, max_results: int = 50) -> list:
"""Returns list of dicts: {url, source, title, page}."""
import re
results = []
headers = {"User-Agent": USER_AGENT}
# Step 1: get vqd token
try:
token_url = f"https://duckduckgo.com/?q={urllib.parse.quote(query)}&iax=images&ia=images"
req = urllib.request.Request(token_url, headers=headers)
with urllib.request.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8", errors="ignore")
m = re.search(r'vqd=["\']?([\d-]+)["\']?', html)
if not m:
return results
vqd = m.group(1)
except Exception as e:
print(f" ddg token err: {e}")
return results
# Step 2: paginate i.js
seen = set()
next_url = None
while len(results) < max_results:
if next_url is None:
params = {"l": "us-en", "o": "json", "q": query, "vqd": vqd, "f": ",,,,,", "p": "1"}
url = f"https://duckduckgo.com/i.js?{urllib.parse.urlencode(params)}"
else:
url = "https://duckduckgo.com" + next_url
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
except Exception as e:
print(f" ddg page err: {e}")
break
items = data.get("results", [])
if not items:
break
for it in items:
img_url = it.get("image")
if not img_url or img_url in seen:
continue
seen.add(img_url)
results.append({
"url": img_url,
"source": "duckduckgo",
"title": it.get("title", "")[:200],
"page": it.get("url", ""),
"license": "unknown",
"query": query,
})
if len(results) >= max_results:
break
next_url = data.get("next")
if not next_url:
break
time.sleep(0.3)
return results
# ============================================================
# WIKIMEDIA COMMONS (CC, free)
# ============================================================
def wikimedia_search(query: str, max_results: int = 50) -> list:
params = {
"action": "query", "format": "json",
"generator": "search", "gsrsearch": f"filetype:bitmap {query}",
"gsrnamespace": "6", "gsrlimit": str(min(50, max_results)),
"prop": "imageinfo", "iiprop": "url|extmetadata|size",
}
url = f"https://commons.wikimedia.org/w/api.php?{urllib.parse.urlencode(params)}"
results = []
try:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=20) as resp:
data = json.loads(resp.read())
except Exception as e:
print(f" wikimedia err: {e}")
return results
pages = (data.get("query") or {}).get("pages") or {}
for _, p in pages.items():
ii = (p.get("imageinfo") or [{}])[0]
img_url = ii.get("url")
if not img_url:
continue
ext = (ii.get("extmetadata") or {})
license_name = (ext.get("LicenseShortName") or {}).get("value", "")
results.append({
"url": img_url,
"source": "wikimedia",
"title": p.get("title", ""),
"page": f"https://commons.wikimedia.org/wiki/{urllib.parse.quote(p.get('title', ''))}",
"license": license_name,
"query": query,
})
if len(results) >= max_results:
break
return results
# ============================================================
# YOUTUBE FRAME EXTRACTION (the killer feature)
# ============================================================
def youtube_extract_frames(
video_url: str,
out_dir: str,
fps: float = 1.0,
max_frames: int = 200,
cookies_from_browser: str = None,
) -> list:
"""Download a YouTube video, extract frames at given fps. Returns list of frame paths.
Uses yt-dlp + ffmpeg (via imageio_ffmpeg's bundled binary).
"""
import yt_dlp
import imageio_ffmpeg
ffmpeg_bin = imageio_ffmpeg.get_ffmpeg_exe()
os.makedirs(out_dir, exist_ok=True)
work_dir = os.path.join(out_dir, "_video_tmp")
os.makedirs(work_dir, exist_ok=True)
# Download with yt-dlp — android+web player clients bypass most YT bot detection
print(f" yt-dlp downloading: {video_url}")
ydl_opts = {
"format": "worstvideo[height>=480]/worst",
"outtmpl": os.path.join(work_dir, "%(id)s.%(ext)s"),
"quiet": True,
"no_warnings": True,
"noplaylist": True,
"extractor_args": {"youtube": {"player_client": ["android", "web"]}},
}
if cookies_from_browser:
ydl_opts["cookiesfrombrowser"] = (cookies_from_browser,)
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
video_id = info.get("id", "video")
title = info.get("title", "")
except Exception as e:
print(f" yt-dlp failed: {e}")
return []
# Find downloaded file
video_files = [os.path.join(work_dir, f) for f in os.listdir(work_dir) if f.startswith(video_id)]
if not video_files:
print(f" no downloaded video found in {work_dir}")
return []
video_file = video_files[0]
# Extract frames via ffmpeg
print(f" ffmpeg extracting frames at {fps} fps from {video_file}")
frame_pattern = os.path.join(work_dir, f"{video_id}_%05d.jpg")
cmd = [
ffmpeg_bin, "-y", "-i", video_file,
"-vf", f"fps={fps}",
"-frames:v", str(max_frames),
"-q:v", "3",
frame_pattern,
]
try:
subprocess.run(cmd, capture_output=True, check=True, timeout=600)
except Exception as e:
print(f" ffmpeg failed: {e}")
return []
frames = sorted(f for f in os.listdir(work_dir) if f.startswith(video_id + "_") and f.endswith(".jpg"))
out_frames = []
for i, fr in enumerate(frames):
src = os.path.join(work_dir, fr)
dest = os.path.join(out_dir, f"yt_{video_id}_{i:05d}.jpg")
shutil.move(src, dest)
out_frames.append({
"path": dest,
"source": "youtube",
"video_id": video_id,
"video_title": title,
"video_url": video_url,
"frame_index": i,
"license": "see source video",
})
# Clean up downloaded video
try:
os.unlink(video_file)
except Exception:
pass
print(f" → extracted {len(out_frames)} frames")
return out_frames
# ============================================================
# QWEN INLINE FILTER (optional)
# ============================================================
def qwen_yes_no(image_path: str, m4_url: str = M4_QWEN_URL, timeout: int = 30) -> tuple:
"""Returns (verdict, raw_answer). verdict ∈ {YES, NO, UNKNOWN, ERROR}."""
try:
img = Image.open(image_path).convert("RGB")
max_dim = 1024
if max(img.size) > max_dim:
ratio = max_dim / max(img.size)
img = img.resize((int(img.size[0] * ratio), int(img.size[1] * ratio)), Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode()
payload = {
"model": QWEN_MODEL_PATH,
"messages": [{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}},
{"type": "text", "text": QWEN_FILTER_PROMPT},
],
}],
"max_tokens": 12, "temperature": 0,
}
req = urllib.request.Request(
f"{m4_url}/v1/chat/completions",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=timeout) as r:
data = json.loads(r.read())
ans = data["choices"][0]["message"]["content"].strip().upper()
first = ans.split()[0].rstrip(".,") if ans else ""
verdict = "YES" if "YES" in first else ("NO" if "NO" in first else "UNKNOWN")
return verdict, ans
except Exception as e:
return "ERROR", str(e)
# ============================================================
# DOWNLOAD + DEDUP
# ============================================================
def url_filename(url: str, source: str) -> str:
h = hashlib.sha1(url.encode()).hexdigest()[:12]
ext = os.path.splitext(urllib.parse.urlparse(url).path)[1].lower()
if ext not in IMAGE_EXTS:
ext = ".jpg"
return f"{source}_{h}{ext}"
def download_one(url: str, dest: str, timeout: int = 30) -> tuple:
try:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = resp.read()
if len(data) < 1024:
return False, 0, "too small"
with open(dest, "wb") as f:
f.write(data)
return True, len(data), None
except Exception as e:
return False, 0, str(e)
def perceptual_hash(image_path: str) -> str:
"""8x8 average-hash for fast cross-source dedup."""
try:
img = Image.open(image_path).convert("L").resize((8, 8), Image.LANCZOS)
pixels = list(img.getdata())
avg = sum(pixels) / len(pixels)
bits = "".join("1" if p > avg else "0" for p in pixels)
return hex(int(bits, 2))[2:].zfill(16)
except Exception:
return ""
# ============================================================
# MAIN
# ============================================================
def main():
p = argparse.ArgumentParser()
p.add_argument("--out", default="drone-dataset-v2", help="Image output root (shared across experiments)")
p.add_argument("--bucket", required=True, help="Bucket subpath, e.g. positive/fiber_spool_drone")
p.add_argument("--experiment", default="",
help="Optional experiment name; if set, creates experiments/<YYYY-MM-DD_HHMMSS>_<name>/")
p.add_argument("--query", action="append", default=[],
help="Search query (repeatable). Hits DDG + Wikimedia.")
p.add_argument("--youtube", action="append", default=[],
help="YouTube video URL or playlist URL (repeatable). Extracts frames.")
p.add_argument("--fps", type=float, default=1.0, help="Frames per second to extract from videos")
p.add_argument("--max-frames-per-video", type=int, default=200)
p.add_argument("--max-per-query", type=int, default=100)
p.add_argument("--workers", type=int, default=50, help="Parallel download threads")
p.add_argument("--filter", action="store_true",
help="Run Qwen YES/NO filter on each downloaded image, skip NO")
p.add_argument("--cookies-from-browser", default=None,
help="For YouTube: chrome|safari|firefox — use browser cookies for age-gated/login videos")
args = p.parse_args()
bucket_dir = os.path.join(args.out, args.bucket)
os.makedirs(bucket_dir, exist_ok=True)
# Set up the dated experiment dir if requested
experiment_dir = None
if args.experiment or "EXPERIMENT_DIR" in os.environ:
from experiments import make_experiment_dir, write_readme, write_config, update_latest_symlink
if "EXPERIMENT_DIR" in os.environ:
experiment_dir = os.environ["EXPERIMENT_DIR"]
os.makedirs(os.path.join(experiment_dir, "gather"), exist_ok=True)
else:
experiment_dir = make_experiment_dir(args.experiment)
write_readme(
experiment_dir,
name=args.experiment,
description=f"gather_v2 run: bucket={args.bucket}, queries={args.query}, youtube={len(args.youtube)} videos",
params=vars(args),
)
write_config(experiment_dir, vars(args))
update_latest_symlink(experiment_dir)
manifest_path = os.path.join(experiment_dir, "gather", "manifest.json")
print(f"Experiment dir: {experiment_dir}")
else:
manifest_path = os.path.join(args.out, "manifest.json")
manifest = []
if os.path.exists(manifest_path):
with open(manifest_path) as f:
manifest = json.load(f)
print(f"Resumed: {len(manifest)} files in manifest")
# Track URL + perceptual-hash dedup sets
seen_urls = {m["url"] for m in manifest if "url" in m}
seen_hashes = {m["phash"] for m in manifest if m.get("phash")}
# ===== Step 1: web search =====
web_hits = []
for q in args.query:
print(f"\n[search] {q!r}")
ddg_results = ddg_search(q, max_results=args.max_per_query)
wiki_results = wikimedia_search(q, max_results=args.max_per_query)
print(f" DDG: {len(ddg_results)} Wikimedia: {len(wiki_results)}")
web_hits.extend(ddg_results)
web_hits.extend(wiki_results)
# Filter out duplicates by URL
web_hits = [h for h in web_hits if h["url"] not in seen_urls]
print(f"\n {len(web_hits)} new web URLs to download (after dedup)")
# ===== Step 2: parallel download =====
downloaded = []
if web_hits:
print(f"\n[download] {len(web_hits)} files via {args.workers} threads...")
t0 = time.time()
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = {}
for hit in web_hits:
fname = url_filename(hit["url"], hit["source"])
dest = os.path.join(bucket_dir, fname)
if os.path.exists(dest):
continue
futures[pool.submit(download_one, hit["url"], dest)] = (hit, dest)
n_ok, n_skip, n_err = 0, 0, 0
for fut in as_completed(futures):
hit, dest = futures[fut]
ok, nbytes, err = fut.result()
if ok:
n_ok += 1
downloaded.append({**hit, "path": dest, "bytes": nbytes})
else:
n_err += 1
elapsed = time.time() - t0
print(f" downloaded: {n_ok} new, {n_err} errors in {elapsed:.0f}s")
# ===== Step 3: YouTube frame extraction =====
youtube_hits = []
for video_url in args.youtube:
print(f"\n[youtube] {video_url}")
frames = youtube_extract_frames(
video_url, bucket_dir,
fps=args.fps, max_frames=args.max_frames_per_video,
cookies_from_browser=args.cookies_from_browser,
)
youtube_hits.extend(frames)
# ===== Step 4: dedup via perceptual hash =====
if downloaded or youtube_hits:
print(f"\n[dedup] computing perceptual hashes...")
for entry in downloaded + youtube_hits:
phash = perceptual_hash(entry["path"])
entry["phash"] = phash
if phash and phash in seen_hashes:
# duplicate — remove the file
try:
os.unlink(entry["path"])
except Exception:
pass
entry["dropped"] = "dup_phash"
else:
seen_hashes.add(phash)
n_dropped = sum(1 for e in downloaded + youtube_hits if e.get("dropped"))
print(f" dropped {n_dropped} duplicates")
# ===== Step 5: Optional Qwen filter =====
survivors = []
for entry in downloaded + youtube_hits:
if entry.get("dropped"):
continue
if not args.filter:
survivors.append(entry)
continue
verdict, raw = qwen_yes_no(entry["path"])
entry["qwen_verdict"] = verdict
entry["qwen_answer"] = raw
if verdict != "YES":
try:
os.unlink(entry["path"])
except Exception:
pass
entry["dropped"] = f"qwen_{verdict}"
else:
survivors.append(entry)
# ===== Save manifest + stats =====
for entry in downloaded + youtube_hits:
entry["bucket"] = args.bucket
manifest.append(entry)
os.makedirs(os.path.dirname(manifest_path), exist_ok=True)
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
# If we're inside an experiment dir, write stats.json next to the manifest
if experiment_dir:
stats = {
"bucket": args.bucket,
"queries": args.query,
"youtube_urls": args.youtube,
"web_hits_found": len(web_hits),
"downloaded": len(downloaded),
"youtube_frames": len(youtube_hits),
"dropped_dup": sum(1 for e in (downloaded + youtube_hits) if e.get("dropped") == "dup_phash"),
"dropped_qwen": sum(1 for e in (downloaded + youtube_hits) if e.get("dropped", "").startswith("qwen")),
"survivors": len(survivors),
"filter_enabled": args.filter,
"manifest_total": len(manifest),
"completed_at": datetime.now().isoformat(timespec="seconds") if 'datetime' in dir() else None,
}
try:
from datetime import datetime as _dt
stats["completed_at"] = _dt.now().isoformat(timespec="seconds")
except Exception:
pass
stats_path = os.path.join(experiment_dir, "gather", "stats.json")
with open(stats_path, "w") as f:
json.dump(stats, f, indent=2)
print(f" stats: {stats_path}")
# ===== Summary =====
print("\n" + "=" * 60)
print("DONE")
print("=" * 60)
print(f" bucket: {args.bucket}")
print(f" web hits found: {len(web_hits)}")
print(f" downloaded: {len(downloaded)}")
print(f" youtube frames: {len(youtube_hits)}")
if args.filter:
n_yes = sum(1 for e in downloaded + youtube_hits if e.get("qwen_verdict") == "YES")
n_no = sum(1 for e in downloaded + youtube_hits if e.get("qwen_verdict") == "NO")
print(f" qwen filter: YES={n_yes} NO={n_no}")
print(f" survivors: {len(survivors)}")
print(f" manifest: {manifest_path} ({len(manifest)} total)")
if __name__ == "__main__":
main()