| |
| """ |
| gather_v2.py — smarter, parallel image gatherer for the drone-falcon dataset. |
| |
| Improvements over gather_images.py (v1): |
| - Parallel downloads (50 threads instead of sequential) |
| - YouTube frame extraction via yt-dlp + ffmpeg (the killer feature for combat footage) |
| - Optional inline Qwen filter — only saves images Qwen says YES to |
| - Perceptual-hash dedup across sources (catches the same image from different sites) |
| - Resumable via local manifest |
| |
| Sources: |
| - DuckDuckGo image search (broad, noisy) |
| - Wikimedia Commons (CC, niche, slower) |
| - YouTube videos / playlists (gold for combat footage) |
| |
| Outputs: |
| drone-dataset-v2/<bucket>/<file>.jpg ← local mirror |
| drone-dataset-v2/manifest.json ← every file with provenance |
| |
| Usage: |
| # Web search only (DDG + Wikimedia) |
| python3 gather_v2.py --bucket positive/fiber_spool_drone \\ |
| --query "fiber optic drone Ukraine" --query "tethered fpv drone" \\ |
| --max-per-query 100 |
| |
| # YouTube frame extraction |
| python3 gather_v2.py --bucket positive/fiber_spool_drone \\ |
| --youtube "https://youtube.com/playlist?list=ABC123" \\ |
| --fps 1 --max-frames-per-video 200 |
| |
| # Inline Qwen filter (only saves YES images) |
| python3 gather_v2.py --bucket positive/fiber_spool_drone \\ |
| --query "fiber optic drone" --filter |
| """ |
|
|
| import argparse |
| import base64 |
| import hashlib |
| import io |
| import json |
| import os |
| import shutil |
| import subprocess |
| import time |
| import urllib.request |
| import urllib.parse |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from pathlib import Path |
| from PIL import Image |
|
|
|
|
| |
| |
| |
|
|
| USER_AGENT = "data-label-factory-gather/0.1 (research project)" |
| |
| M4_QWEN_URL = os.environ.get("QWEN_URL", "http://localhost:8291") |
| QWEN_MODEL_PATH = os.environ.get( |
| "QWEN_MODEL_PATH", "mlx-community/Qwen2.5-VL-3B-Instruct-4bit" |
| ) |
| QWEN_FILTER_PROMPT = ( |
| "Look at this image. Does it show a drone, a cable spool, or a wound fiber optic cable?\n" |
| "Answer with exactly one word: YES or NO.\n" |
| "YES if you see ANY of: a drone, a quadcopter, a cable reel, a fiber spool, a wound cable.\n" |
| "NO if the main subject is something else." |
| ) |
|
|
| IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"} |
|
|
|
|
| |
| |
| |
|
|
|
|
| def ddg_search(query: str, max_results: int = 50) -> list: |
| """Returns list of dicts: {url, source, title, page}.""" |
| import re |
| results = [] |
| headers = {"User-Agent": USER_AGENT} |
|
|
| |
| try: |
| token_url = f"https://duckduckgo.com/?q={urllib.parse.quote(query)}&iax=images&ia=images" |
| req = urllib.request.Request(token_url, headers=headers) |
| with urllib.request.urlopen(req, timeout=15) as resp: |
| html = resp.read().decode("utf-8", errors="ignore") |
| m = re.search(r'vqd=["\']?([\d-]+)["\']?', html) |
| if not m: |
| return results |
| vqd = m.group(1) |
| except Exception as e: |
| print(f" ddg token err: {e}") |
| return results |
|
|
| |
| seen = set() |
| next_url = None |
| while len(results) < max_results: |
| if next_url is None: |
| params = {"l": "us-en", "o": "json", "q": query, "vqd": vqd, "f": ",,,,,", "p": "1"} |
| url = f"https://duckduckgo.com/i.js?{urllib.parse.urlencode(params)}" |
| else: |
| url = "https://duckduckgo.com" + next_url |
| try: |
| req = urllib.request.Request(url, headers=headers) |
| with urllib.request.urlopen(req, timeout=15) as resp: |
| data = json.loads(resp.read()) |
| except Exception as e: |
| print(f" ddg page err: {e}") |
| break |
| items = data.get("results", []) |
| if not items: |
| break |
| for it in items: |
| img_url = it.get("image") |
| if not img_url or img_url in seen: |
| continue |
| seen.add(img_url) |
| results.append({ |
| "url": img_url, |
| "source": "duckduckgo", |
| "title": it.get("title", "")[:200], |
| "page": it.get("url", ""), |
| "license": "unknown", |
| "query": query, |
| }) |
| if len(results) >= max_results: |
| break |
| next_url = data.get("next") |
| if not next_url: |
| break |
| time.sleep(0.3) |
| return results |
|
|
|
|
| |
| |
| |
|
|
|
|
| def wikimedia_search(query: str, max_results: int = 50) -> list: |
| params = { |
| "action": "query", "format": "json", |
| "generator": "search", "gsrsearch": f"filetype:bitmap {query}", |
| "gsrnamespace": "6", "gsrlimit": str(min(50, max_results)), |
| "prop": "imageinfo", "iiprop": "url|extmetadata|size", |
| } |
| url = f"https://commons.wikimedia.org/w/api.php?{urllib.parse.urlencode(params)}" |
| results = [] |
| try: |
| req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) |
| with urllib.request.urlopen(req, timeout=20) as resp: |
| data = json.loads(resp.read()) |
| except Exception as e: |
| print(f" wikimedia err: {e}") |
| return results |
| pages = (data.get("query") or {}).get("pages") or {} |
| for _, p in pages.items(): |
| ii = (p.get("imageinfo") or [{}])[0] |
| img_url = ii.get("url") |
| if not img_url: |
| continue |
| ext = (ii.get("extmetadata") or {}) |
| license_name = (ext.get("LicenseShortName") or {}).get("value", "") |
| results.append({ |
| "url": img_url, |
| "source": "wikimedia", |
| "title": p.get("title", ""), |
| "page": f"https://commons.wikimedia.org/wiki/{urllib.parse.quote(p.get('title', ''))}", |
| "license": license_name, |
| "query": query, |
| }) |
| if len(results) >= max_results: |
| break |
| return results |
|
|
|
|
| |
| |
| |
|
|
|
|
| def youtube_extract_frames( |
| video_url: str, |
| out_dir: str, |
| fps: float = 1.0, |
| max_frames: int = 200, |
| cookies_from_browser: str = None, |
| ) -> list: |
| """Download a YouTube video, extract frames at given fps. Returns list of frame paths. |
| Uses yt-dlp + ffmpeg (via imageio_ffmpeg's bundled binary). |
| """ |
| import yt_dlp |
| import imageio_ffmpeg |
|
|
| ffmpeg_bin = imageio_ffmpeg.get_ffmpeg_exe() |
| os.makedirs(out_dir, exist_ok=True) |
| work_dir = os.path.join(out_dir, "_video_tmp") |
| os.makedirs(work_dir, exist_ok=True) |
|
|
| |
| print(f" yt-dlp downloading: {video_url}") |
| ydl_opts = { |
| "format": "worstvideo[height>=480]/worst", |
| "outtmpl": os.path.join(work_dir, "%(id)s.%(ext)s"), |
| "quiet": True, |
| "no_warnings": True, |
| "noplaylist": True, |
| "extractor_args": {"youtube": {"player_client": ["android", "web"]}}, |
| } |
| if cookies_from_browser: |
| ydl_opts["cookiesfrombrowser"] = (cookies_from_browser,) |
| try: |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(video_url, download=True) |
| video_id = info.get("id", "video") |
| title = info.get("title", "") |
| except Exception as e: |
| print(f" yt-dlp failed: {e}") |
| return [] |
|
|
| |
| video_files = [os.path.join(work_dir, f) for f in os.listdir(work_dir) if f.startswith(video_id)] |
| if not video_files: |
| print(f" no downloaded video found in {work_dir}") |
| return [] |
| video_file = video_files[0] |
|
|
| |
| print(f" ffmpeg extracting frames at {fps} fps from {video_file}") |
| frame_pattern = os.path.join(work_dir, f"{video_id}_%05d.jpg") |
| cmd = [ |
| ffmpeg_bin, "-y", "-i", video_file, |
| "-vf", f"fps={fps}", |
| "-frames:v", str(max_frames), |
| "-q:v", "3", |
| frame_pattern, |
| ] |
| try: |
| subprocess.run(cmd, capture_output=True, check=True, timeout=600) |
| except Exception as e: |
| print(f" ffmpeg failed: {e}") |
| return [] |
|
|
| frames = sorted(f for f in os.listdir(work_dir) if f.startswith(video_id + "_") and f.endswith(".jpg")) |
| out_frames = [] |
| for i, fr in enumerate(frames): |
| src = os.path.join(work_dir, fr) |
| dest = os.path.join(out_dir, f"yt_{video_id}_{i:05d}.jpg") |
| shutil.move(src, dest) |
| out_frames.append({ |
| "path": dest, |
| "source": "youtube", |
| "video_id": video_id, |
| "video_title": title, |
| "video_url": video_url, |
| "frame_index": i, |
| "license": "see source video", |
| }) |
|
|
| |
| try: |
| os.unlink(video_file) |
| except Exception: |
| pass |
|
|
| print(f" → extracted {len(out_frames)} frames") |
| return out_frames |
|
|
|
|
| |
| |
| |
|
|
|
|
| def qwen_yes_no(image_path: str, m4_url: str = M4_QWEN_URL, timeout: int = 30) -> tuple: |
| """Returns (verdict, raw_answer). verdict ∈ {YES, NO, UNKNOWN, ERROR}.""" |
| try: |
| img = Image.open(image_path).convert("RGB") |
| max_dim = 1024 |
| if max(img.size) > max_dim: |
| ratio = max_dim / max(img.size) |
| img = img.resize((int(img.size[0] * ratio), int(img.size[1] * ratio)), Image.LANCZOS) |
| buf = io.BytesIO() |
| img.save(buf, format="PNG") |
| b64 = base64.b64encode(buf.getvalue()).decode() |
| payload = { |
| "model": QWEN_MODEL_PATH, |
| "messages": [{ |
| "role": "user", |
| "content": [ |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}, |
| {"type": "text", "text": QWEN_FILTER_PROMPT}, |
| ], |
| }], |
| "max_tokens": 12, "temperature": 0, |
| } |
| req = urllib.request.Request( |
| f"{m4_url}/v1/chat/completions", |
| data=json.dumps(payload).encode(), |
| headers={"Content-Type": "application/json"}, |
| method="POST", |
| ) |
| with urllib.request.urlopen(req, timeout=timeout) as r: |
| data = json.loads(r.read()) |
| ans = data["choices"][0]["message"]["content"].strip().upper() |
| first = ans.split()[0].rstrip(".,") if ans else "" |
| verdict = "YES" if "YES" in first else ("NO" if "NO" in first else "UNKNOWN") |
| return verdict, ans |
| except Exception as e: |
| return "ERROR", str(e) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def url_filename(url: str, source: str) -> str: |
| h = hashlib.sha1(url.encode()).hexdigest()[:12] |
| ext = os.path.splitext(urllib.parse.urlparse(url).path)[1].lower() |
| if ext not in IMAGE_EXTS: |
| ext = ".jpg" |
| return f"{source}_{h}{ext}" |
|
|
|
|
| def download_one(url: str, dest: str, timeout: int = 30) -> tuple: |
| try: |
| req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| data = resp.read() |
| if len(data) < 1024: |
| return False, 0, "too small" |
| with open(dest, "wb") as f: |
| f.write(data) |
| return True, len(data), None |
| except Exception as e: |
| return False, 0, str(e) |
|
|
|
|
| def perceptual_hash(image_path: str) -> str: |
| """8x8 average-hash for fast cross-source dedup.""" |
| try: |
| img = Image.open(image_path).convert("L").resize((8, 8), Image.LANCZOS) |
| pixels = list(img.getdata()) |
| avg = sum(pixels) / len(pixels) |
| bits = "".join("1" if p > avg else "0" for p in pixels) |
| return hex(int(bits, 2))[2:].zfill(16) |
| except Exception: |
| return "" |
|
|
|
|
| |
| |
| |
|
|
|
|
| def main(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--out", default="drone-dataset-v2", help="Image output root (shared across experiments)") |
| p.add_argument("--bucket", required=True, help="Bucket subpath, e.g. positive/fiber_spool_drone") |
| p.add_argument("--experiment", default="", |
| help="Optional experiment name; if set, creates experiments/<YYYY-MM-DD_HHMMSS>_<name>/") |
| p.add_argument("--query", action="append", default=[], |
| help="Search query (repeatable). Hits DDG + Wikimedia.") |
| p.add_argument("--youtube", action="append", default=[], |
| help="YouTube video URL or playlist URL (repeatable). Extracts frames.") |
| p.add_argument("--fps", type=float, default=1.0, help="Frames per second to extract from videos") |
| p.add_argument("--max-frames-per-video", type=int, default=200) |
| p.add_argument("--max-per-query", type=int, default=100) |
| p.add_argument("--workers", type=int, default=50, help="Parallel download threads") |
| p.add_argument("--filter", action="store_true", |
| help="Run Qwen YES/NO filter on each downloaded image, skip NO") |
| p.add_argument("--cookies-from-browser", default=None, |
| help="For YouTube: chrome|safari|firefox — use browser cookies for age-gated/login videos") |
| args = p.parse_args() |
|
|
| bucket_dir = os.path.join(args.out, args.bucket) |
| os.makedirs(bucket_dir, exist_ok=True) |
|
|
| |
| experiment_dir = None |
| if args.experiment or "EXPERIMENT_DIR" in os.environ: |
| from experiments import make_experiment_dir, write_readme, write_config, update_latest_symlink |
| if "EXPERIMENT_DIR" in os.environ: |
| experiment_dir = os.environ["EXPERIMENT_DIR"] |
| os.makedirs(os.path.join(experiment_dir, "gather"), exist_ok=True) |
| else: |
| experiment_dir = make_experiment_dir(args.experiment) |
| write_readme( |
| experiment_dir, |
| name=args.experiment, |
| description=f"gather_v2 run: bucket={args.bucket}, queries={args.query}, youtube={len(args.youtube)} videos", |
| params=vars(args), |
| ) |
| write_config(experiment_dir, vars(args)) |
| update_latest_symlink(experiment_dir) |
| manifest_path = os.path.join(experiment_dir, "gather", "manifest.json") |
| print(f"Experiment dir: {experiment_dir}") |
| else: |
| manifest_path = os.path.join(args.out, "manifest.json") |
| manifest = [] |
| if os.path.exists(manifest_path): |
| with open(manifest_path) as f: |
| manifest = json.load(f) |
| print(f"Resumed: {len(manifest)} files in manifest") |
|
|
| |
| seen_urls = {m["url"] for m in manifest if "url" in m} |
| seen_hashes = {m["phash"] for m in manifest if m.get("phash")} |
|
|
| |
| web_hits = [] |
| for q in args.query: |
| print(f"\n[search] {q!r}") |
| ddg_results = ddg_search(q, max_results=args.max_per_query) |
| wiki_results = wikimedia_search(q, max_results=args.max_per_query) |
| print(f" DDG: {len(ddg_results)} Wikimedia: {len(wiki_results)}") |
| web_hits.extend(ddg_results) |
| web_hits.extend(wiki_results) |
|
|
| |
| web_hits = [h for h in web_hits if h["url"] not in seen_urls] |
| print(f"\n {len(web_hits)} new web URLs to download (after dedup)") |
|
|
| |
| downloaded = [] |
| if web_hits: |
| print(f"\n[download] {len(web_hits)} files via {args.workers} threads...") |
| t0 = time.time() |
| with ThreadPoolExecutor(max_workers=args.workers) as pool: |
| futures = {} |
| for hit in web_hits: |
| fname = url_filename(hit["url"], hit["source"]) |
| dest = os.path.join(bucket_dir, fname) |
| if os.path.exists(dest): |
| continue |
| futures[pool.submit(download_one, hit["url"], dest)] = (hit, dest) |
| n_ok, n_skip, n_err = 0, 0, 0 |
| for fut in as_completed(futures): |
| hit, dest = futures[fut] |
| ok, nbytes, err = fut.result() |
| if ok: |
| n_ok += 1 |
| downloaded.append({**hit, "path": dest, "bytes": nbytes}) |
| else: |
| n_err += 1 |
| elapsed = time.time() - t0 |
| print(f" downloaded: {n_ok} new, {n_err} errors in {elapsed:.0f}s") |
|
|
| |
| youtube_hits = [] |
| for video_url in args.youtube: |
| print(f"\n[youtube] {video_url}") |
| frames = youtube_extract_frames( |
| video_url, bucket_dir, |
| fps=args.fps, max_frames=args.max_frames_per_video, |
| cookies_from_browser=args.cookies_from_browser, |
| ) |
| youtube_hits.extend(frames) |
|
|
| |
| if downloaded or youtube_hits: |
| print(f"\n[dedup] computing perceptual hashes...") |
| for entry in downloaded + youtube_hits: |
| phash = perceptual_hash(entry["path"]) |
| entry["phash"] = phash |
| if phash and phash in seen_hashes: |
| |
| try: |
| os.unlink(entry["path"]) |
| except Exception: |
| pass |
| entry["dropped"] = "dup_phash" |
| else: |
| seen_hashes.add(phash) |
| n_dropped = sum(1 for e in downloaded + youtube_hits if e.get("dropped")) |
| print(f" dropped {n_dropped} duplicates") |
|
|
| |
| survivors = [] |
| for entry in downloaded + youtube_hits: |
| if entry.get("dropped"): |
| continue |
| if not args.filter: |
| survivors.append(entry) |
| continue |
| verdict, raw = qwen_yes_no(entry["path"]) |
| entry["qwen_verdict"] = verdict |
| entry["qwen_answer"] = raw |
| if verdict != "YES": |
| try: |
| os.unlink(entry["path"]) |
| except Exception: |
| pass |
| entry["dropped"] = f"qwen_{verdict}" |
| else: |
| survivors.append(entry) |
|
|
| |
| for entry in downloaded + youtube_hits: |
| entry["bucket"] = args.bucket |
| manifest.append(entry) |
| os.makedirs(os.path.dirname(manifest_path), exist_ok=True) |
| with open(manifest_path, "w") as f: |
| json.dump(manifest, f, indent=2) |
|
|
| |
| if experiment_dir: |
| stats = { |
| "bucket": args.bucket, |
| "queries": args.query, |
| "youtube_urls": args.youtube, |
| "web_hits_found": len(web_hits), |
| "downloaded": len(downloaded), |
| "youtube_frames": len(youtube_hits), |
| "dropped_dup": sum(1 for e in (downloaded + youtube_hits) if e.get("dropped") == "dup_phash"), |
| "dropped_qwen": sum(1 for e in (downloaded + youtube_hits) if e.get("dropped", "").startswith("qwen")), |
| "survivors": len(survivors), |
| "filter_enabled": args.filter, |
| "manifest_total": len(manifest), |
| "completed_at": datetime.now().isoformat(timespec="seconds") if 'datetime' in dir() else None, |
| } |
| try: |
| from datetime import datetime as _dt |
| stats["completed_at"] = _dt.now().isoformat(timespec="seconds") |
| except Exception: |
| pass |
| stats_path = os.path.join(experiment_dir, "gather", "stats.json") |
| with open(stats_path, "w") as f: |
| json.dump(stats, f, indent=2) |
| print(f" stats: {stats_path}") |
|
|
| |
| print("\n" + "=" * 60) |
| print("DONE") |
| print("=" * 60) |
| print(f" bucket: {args.bucket}") |
| print(f" web hits found: {len(web_hits)}") |
| print(f" downloaded: {len(downloaded)}") |
| print(f" youtube frames: {len(youtube_hits)}") |
| if args.filter: |
| n_yes = sum(1 for e in downloaded + youtube_hits if e.get("qwen_verdict") == "YES") |
| n_no = sum(1 for e in downloaded + youtube_hits if e.get("qwen_verdict") == "NO") |
| print(f" qwen filter: YES={n_yes} NO={n_no}") |
| print(f" survivors: {len(survivors)}") |
| print(f" manifest: {manifest_path} ({len(manifest)} total)") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|