#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = [ # "torch", # "torchvision", # "transformers>=4.40.0", # "peft>=0.10.0", # "datasets>=2.18.0", # "accelerate", # "bitsandbytes", # "qwen-vl-utils", # "pillow", # "opencv-python-headless", # "huggingface_hub>=0.21.0", # "av", # ] # /// """ Test ALL frames for manual curation. Saves all results with images for human review. Does NOT auto-select - human curator will pick best examples. Run with: hf jobs uv run --flavor a10g-large --secrets HF_TOKEN test_all_frames_for_curation.py """ import os import cv2 import re import json import torch import base64 from io import BytesIO from PIL import Image, ImageDraw, ImageFont from pathlib import Path from typing import Optional, List, Tuple # ============================================================ # Config # ============================================================ UNIFIED_MODEL = "mmrech/pitvqa-qwen2vl-unified-v2" VIDEO_DATASET = "UCL-WEISS/PitVis-2023" VIDEO_CACHE = Path("/tmp/videos") VIDEO_CACHE.mkdir(exist_ok=True) OUTPUT_DIR = Path("./curation_review") OUTPUT_DIR.mkdir(exist_ok=True) # Test configurations - EXTENSIVE # Sample frames from each video at regular intervals VIDEOS_TO_TEST = ["video_01", "video_02", "video_03", "video_05", "video_06", "video_10", "video_15", "video_20"] FRAMES_PER_VIDEO = [200, 500, 800, 1200, 1800] # Sample at these frame indices # Targets to test POINT_TARGETS = ["suction device", "surgical instruments"] # Focus on main targets BBOX_TARGETS = ["suction device", "surgical instruments"] # ============================================================ # Setup # ============================================================ from huggingface_hub import login, HfApi, hf_hub_download hf_token = os.environ.get("HF_TOKEN") if hf_token: login(token=hf_token) print("โœ“ Logged in to HuggingFace") api = HfApi() # ============================================================ # Load Model # ============================================================ print("\n๐Ÿค– Loading model...") from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig from peft import PeftModel processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct", trust_remote_code=True) bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) base = Qwen2VLForConditionalGeneration.from_pretrained( "Qwen/Qwen2-VL-2B-Instruct", quantization_config=bnb_config, device_map="auto", trust_remote_code=True ) model = PeftModel.from_pretrained(base, UNIFIED_MODEL, adapter_name="stage1", subfolder="stage1") model.load_adapter(UNIFIED_MODEL, adapter_name="stage2", subfolder="stage2") print(f"โœ“ Model loaded") # ============================================================ # Helpers # ============================================================ def download_video(video_id: str) -> Optional[Path]: video_path = VIDEO_CACHE / f"{video_id}.mp4" if not video_path.exists(): try: downloaded = hf_hub_download( repo_id=VIDEO_DATASET, filename=f"videos/{video_id}.mp4", repo_type="dataset" ) import shutil shutil.copy(downloaded, video_path) except Exception as e: print(f" โš  Could not download {video_id}: {e}") return None return video_path def extract_frame(video_id: str, frame_idx: int) -> Optional[Image.Image]: video_path = download_video(video_id) if video_path is None: return None cap = cv2.VideoCapture(str(video_path)) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() cap.release() if ret: return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) return None def run_inference(image, prompt, adapter="stage1"): model.set_adapter(adapter) content = [{"type": "image", "image": image}, {"type": "text", "text": prompt}] messages = [{"role": "user", "content": content}] text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt").to(model.device) with torch.no_grad(): output = model.generate(**inputs, max_new_tokens=256, do_sample=False) response = processor.decode(output[0], skip_special_tokens=True) if "assistant" in response.lower(): response = response.split("assistant")[-1].strip() return response def extract_point(text) -> Tuple[Optional[float], Optional[float]]: match = re.search(r"", text) if match: return float(match.group(1)), float(match.group(2)) return None, None def extract_bbox(text) -> Optional[List[float]]: match = re.search(r"", text) if match: return [float(match.group(i)) for i in range(1, 5)] return None def draw_point_on_image(image: Image.Image, x: float, y: float, label: str) -> Image.Image: """Draw point marker on image for visualization.""" img = image.copy() draw = ImageDraw.Draw(img) w, h = img.size px, py = int(x * w / 100), int(y * h / 100) # Draw crosshair draw.ellipse([px-8, py-8, px+8, py+8], fill="red", outline="white", width=2) draw.line([px-20, py, px+20, py], fill="white", width=2) draw.line([px, py-20, px, py+20], fill="white", width=2) # Draw label draw.text((10, 10), f"{label}: ({x:.1f}, {y:.1f})", fill="white") return img def draw_bbox_on_image(image: Image.Image, bbox: List[float], label: str) -> Image.Image: """Draw bounding box on image for visualization.""" img = image.copy() draw = ImageDraw.Draw(img) w, h = img.size x1, y1, x2, y2 = [int(c * w / 100) if i % 2 == 0 else int(c * h / 100) for i, c in enumerate(bbox)] draw.rectangle([x1, y1, x2, y2], outline="lime", width=3) draw.text((10, 10), f"{label}: [{bbox[0]:.0f},{bbox[1]:.0f}]-[{bbox[2]:.0f},{bbox[3]:.0f}]", fill="white") return img # ============================================================ # Test All Frames # ============================================================ print("\n" + "=" * 60) print("๐Ÿงช TESTING ALL FRAMES FOR CURATION") print("=" * 60) all_results = [] for video_id in VIDEOS_TO_TEST: print(f"\n๐Ÿ“น Processing {video_id}...") for frame_idx in FRAMES_PER_VIDEO: frame = extract_frame(video_id, frame_idx) if frame is None: print(f" โš  Frame {frame_idx} failed") continue print(f" Frame {frame_idx}:") # Test pointing for target in POINT_TARGETS: prompt = f"Point to the {target} in this surgical image." response = run_inference(frame, prompt, adapter="stage1") x, y = extract_point(response) success = x is not None and 0 <= x <= 100 and 0 <= y <= 100 result = { "id": f"{video_id}_{frame_idx}_point_{target.replace(' ', '_')}", "video_id": video_id, "frame_idx": frame_idx, "task": "point", "target": target, "response": response, "x": x, "y": y, "success": success, } all_results.append(result) # Save visualization if success: viz = draw_point_on_image(frame, x, y, target) viz_path = OUTPUT_DIR / f"{video_id}_{frame_idx}_point_{target.replace(' ', '_')}.jpg" viz.save(viz_path, quality=90) status = "โœ…" if success else "โŒ" coords = f"({x:.1f}, {y:.1f})" if success else "FAILED" print(f" {status} Point {target}: {coords}") # Test bbox for target in BBOX_TARGETS: prompt = f"Draw a bounding box around the {target}." response = run_inference(frame, prompt, adapter="stage2") bbox = extract_bbox(response) success = bbox is not None and all(0 <= c <= 100 for c in bbox) result = { "id": f"{video_id}_{frame_idx}_bbox_{target.replace(' ', '_')}", "video_id": video_id, "frame_idx": frame_idx, "task": "bbox", "target": target, "response": response, "bbox": bbox, "success": success, } all_results.append(result) # Save visualization if success: viz = draw_bbox_on_image(frame, bbox, target) viz_path = OUTPUT_DIR / f"{video_id}_{frame_idx}_bbox_{target.replace(' ', '_')}.jpg" viz.save(viz_path, quality=90) status = "โœ…" if success else "โŒ" coords = f"[{bbox[0]:.0f}-{bbox[2]:.0f}]x[{bbox[1]:.0f}-{bbox[3]:.0f}]" if success else "FAILED" print(f" {status} BBox {target}: {coords}") # Also save raw frame for reference raw_path = OUTPUT_DIR / f"{video_id}_{frame_idx}_raw.jpg" frame.save(raw_path, quality=90) # ============================================================ # Save Results # ============================================================ print("\n" + "=" * 60) print("๐Ÿ’พ SAVING FOR CURATION") print("=" * 60) # Save all results as JSON with open(OUTPUT_DIR / "all_results.json", "w") as f: json.dump(all_results, f, indent=2) # Summary successful = [r for r in all_results if r["success"]] print(f"Total tests: {len(all_results)}") print(f"Successful: {len(successful)} ({100*len(successful)/len(all_results):.1f}%)") # Create curation index index_html = """ PitVQA Curation Review

PitVQA Curation Review

Review these results and note which ones are good examples.

""" for r in successful: img_name = f"{r['id']}.jpg" index_html += f"""

{r['video_id']} f{r['frame_idx']}
{r['task']}: {r['target']}
""" index_html += "" with open(OUTPUT_DIR / "index.html", "w") as f: f.write(index_html) # Upload to HuggingFace as dataset for review print("\n๐Ÿ“ค Uploading for review...") try: # Create/upload to a review dataset REVIEW_REPO = "mmrech/pitvqa-curation-review" api.create_repo(REVIEW_REPO, repo_type="dataset", exist_ok=True) api.upload_folder( folder_path=str(OUTPUT_DIR), repo_id=REVIEW_REPO, repo_type="dataset" ) print(f"โœ“ Uploaded to https://huggingface.co/datasets/{REVIEW_REPO}") except Exception as e: print(f"โš  Upload error: {e}") print("\nโœ… DONE!") print(f"Review the results at: https://huggingface.co/datasets/mmrech/pitvqa-curation-review") print("Then tell me which examples to use for the showcase.")