|
|
|
|
|
""" |
|
|
OCULUS Benchmark Evaluation Suite |
|
|
|
|
|
Evaluates Oculus on multiple vision-language benchmarks: |
|
|
1. COCO Detection (mAP) |
|
|
2. Car Part Damage Detection |
|
|
3. Counting (Pixmo-style) |
|
|
4. VQA Accuracy |
|
|
5. RefCOCO Grounding (IoU) |
|
|
|
|
|
Inspired by Isaac model benchmarks. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import time |
|
|
import random |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass, field |
|
|
from typing import List, Dict, Tuple, Optional |
|
|
from collections import defaultdict |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from PIL import Image |
|
|
|
|
|
OCULUS_ROOT = Path(__file__).parent |
|
|
sys.path.insert(0, str(OCULUS_ROOT)) |
|
|
|
|
|
from oculus_unified_model import OculusForConditionalGeneration |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_iou(box1: List[float], box2: List[float]) -> float: |
|
|
"""Compute IoU between two boxes [x1, y1, x2, y2].""" |
|
|
x1 = max(box1[0], box2[0]) |
|
|
y1 = max(box1[1], box2[1]) |
|
|
x2 = min(box1[2], box2[2]) |
|
|
y2 = min(box1[3], box2[3]) |
|
|
|
|
|
inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
|
|
|
|
|
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
|
|
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
|
|
|
|
|
union_area = area1 + area2 - inter_area + 1e-8 |
|
|
|
|
|
return inter_area / union_area |
|
|
|
|
|
|
|
|
def compute_ap(recalls: List[float], precisions: List[float]) -> float: |
|
|
"""Compute Average Precision from recall/precision curve.""" |
|
|
recalls = [0] + list(recalls) + [1] |
|
|
precisions = [0] + list(precisions) + [0] |
|
|
|
|
|
|
|
|
for i in range(len(precisions) - 2, -1, -1): |
|
|
precisions[i] = max(precisions[i], precisions[i + 1]) |
|
|
|
|
|
|
|
|
ap = 0 |
|
|
for i in range(1, len(recalls)): |
|
|
ap += (recalls[i] - recalls[i - 1]) * precisions[i] |
|
|
|
|
|
return ap |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class COCODetectionBenchmark: |
|
|
"""COCO Detection benchmark - computes mAP@0.5.""" |
|
|
|
|
|
def __init__(self, data_dir: str = "data/coco", max_samples: int = 500): |
|
|
self.data_dir = Path(data_dir) |
|
|
self.max_samples = max_samples |
|
|
|
|
|
|
|
|
ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
|
|
|
|
|
with open(ann_file) as f: |
|
|
coco = json.load(f) |
|
|
|
|
|
|
|
|
self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
|
|
self.cat_id_to_idx = {c['id']: i for i, c in enumerate(coco['categories'])} |
|
|
|
|
|
|
|
|
img_to_anns = defaultdict(list) |
|
|
for ann in coco['annotations']: |
|
|
if ann.get('iscrowd', 0): |
|
|
continue |
|
|
img_to_anns[ann['image_id']].append(ann) |
|
|
|
|
|
self.samples = [] |
|
|
for img in coco['images']: |
|
|
if img['id'] not in img_to_anns: |
|
|
continue |
|
|
|
|
|
img_path = self.data_dir / "images" / img['file_name'] |
|
|
if not img_path.exists(): |
|
|
continue |
|
|
|
|
|
anns = img_to_anns[img['id']] |
|
|
boxes = [] |
|
|
labels = [] |
|
|
for ann in anns: |
|
|
if 'bbox' not in ann: |
|
|
continue |
|
|
x, y, w, h = ann['bbox'] |
|
|
|
|
|
boxes.append([ |
|
|
x / img['width'], |
|
|
y / img['height'], |
|
|
(x + w) / img['width'], |
|
|
(y + h) / img['height'] |
|
|
]) |
|
|
labels.append(self.cat_id_to_idx[ann['category_id']]) |
|
|
|
|
|
if boxes: |
|
|
self.samples.append({ |
|
|
'path': str(img_path), |
|
|
'boxes': boxes, |
|
|
'labels': labels |
|
|
}) |
|
|
|
|
|
if len(self.samples) >= max_samples: |
|
|
break |
|
|
|
|
|
print(f" Loaded {len(self.samples)} COCO samples") |
|
|
|
|
|
def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
|
|
"""Evaluate detection performance.""" |
|
|
print("\n📦 COCO Detection Benchmark") |
|
|
print("-" * 40) |
|
|
|
|
|
all_ious = [] |
|
|
all_correct = [] |
|
|
|
|
|
for i, sample in enumerate(self.samples): |
|
|
if i % 50 == 0: |
|
|
print(f" Progress: {i}/{len(self.samples)}") |
|
|
|
|
|
try: |
|
|
image = Image.open(sample['path']).convert('RGB') |
|
|
output = model.generate(image, mode="box", prompt="Detect objects") |
|
|
|
|
|
gt_boxes = sample['boxes'] |
|
|
pred_boxes = output.boxes |
|
|
pred_labels = [int(l) for l in output.labels] |
|
|
|
|
|
|
|
|
for gt_box, gt_label in zip(gt_boxes, sample['labels']): |
|
|
best_iou = 0 |
|
|
is_correct = False |
|
|
|
|
|
for pred_box, pred_label in zip(pred_boxes, pred_labels): |
|
|
iou = compute_iou(gt_box, list(pred_box)) |
|
|
if iou > best_iou: |
|
|
best_iou = iou |
|
|
is_correct = (iou >= 0.5) and (pred_label == gt_label) |
|
|
|
|
|
all_ious.append(best_iou) |
|
|
all_correct.append(is_correct) |
|
|
|
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
mean_iou = np.mean(all_ious) if all_ious else 0 |
|
|
accuracy = np.mean(all_correct) if all_correct else 0 |
|
|
|
|
|
results = { |
|
|
'mean_iou': float(mean_iou), |
|
|
'accuracy': float(accuracy), |
|
|
'num_samples': len(self.samples) |
|
|
} |
|
|
|
|
|
print(f" Mean IoU: {mean_iou:.4f}") |
|
|
print(f" Accuracy (IoU>0.5 + correct class): {accuracy:.4f}") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CarDamageBenchmark: |
|
|
"""Car Part Damage detection benchmark from HuggingFace.""" |
|
|
|
|
|
CAR_PART_LABELS = [ |
|
|
'Back-bumper', 'Back-door', 'Back-wheel', 'Back-window', 'Back-windshield', |
|
|
'Fender', 'Front-bumper', 'Front-door', 'Front-wheel', 'Front-window', |
|
|
'Grille', 'Headlight', 'Hood', 'License-plate', 'Mirror', 'Quarter-panel', |
|
|
'Rocker-panel', 'Roof', 'Tail-light', 'Trunk', 'Windshield' |
|
|
] |
|
|
|
|
|
def __init__(self, max_samples: int = 50): |
|
|
self.max_samples = max_samples |
|
|
self.samples = [] |
|
|
|
|
|
try: |
|
|
from datasets import load_dataset |
|
|
print(" Loading car_part_damage dataset...") |
|
|
ds = load_dataset("moondream/car_part_damage", split="test") |
|
|
|
|
|
for i, item in enumerate(ds): |
|
|
if i >= max_samples: |
|
|
break |
|
|
|
|
|
boxes = [] |
|
|
labels = [] |
|
|
for ann in item['annotations']: |
|
|
bbox = ann['bbox'] |
|
|
|
|
|
boxes.append([ |
|
|
bbox[0] / item['width'], |
|
|
bbox[1] / item['height'], |
|
|
bbox[2] / item['width'], |
|
|
bbox[3] / item['height'] |
|
|
]) |
|
|
labels.append(ann['category']) |
|
|
|
|
|
self.samples.append({ |
|
|
'image': item['image'], |
|
|
'boxes': boxes, |
|
|
'labels': labels, |
|
|
'width': item['width'], |
|
|
'height': item['height'] |
|
|
}) |
|
|
|
|
|
print(f" Loaded {len(self.samples)} car damage samples") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" ⚠️ Could not load dataset: {e}") |
|
|
|
|
|
def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
|
|
"""Evaluate on car damage detection.""" |
|
|
print("\n🚗 Car Part Damage Benchmark") |
|
|
print("-" * 40) |
|
|
|
|
|
if not self.samples: |
|
|
return {'error': 'Dataset not loaded'} |
|
|
|
|
|
all_ious = [] |
|
|
correct_parts = 0 |
|
|
total_parts = 0 |
|
|
|
|
|
for i, sample in enumerate(self.samples): |
|
|
if i % 10 == 0: |
|
|
print(f" Progress: {i}/{len(self.samples)}") |
|
|
|
|
|
try: |
|
|
image = sample['image'] |
|
|
output = model.generate(image, mode="box", prompt="Detect car parts and damage") |
|
|
|
|
|
pred_boxes = output.boxes |
|
|
|
|
|
for gt_box in sample['boxes']: |
|
|
total_parts += 1 |
|
|
best_iou = 0 |
|
|
|
|
|
for pred_box in pred_boxes: |
|
|
iou = compute_iou(gt_box, list(pred_box)) |
|
|
best_iou = max(best_iou, iou) |
|
|
|
|
|
all_ious.append(best_iou) |
|
|
if best_iou >= 0.5: |
|
|
correct_parts += 1 |
|
|
|
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
mean_iou = np.mean(all_ious) if all_ious else 0 |
|
|
recall = correct_parts / total_parts if total_parts > 0 else 0 |
|
|
|
|
|
results = { |
|
|
'mean_iou': float(mean_iou), |
|
|
'recall@0.5': float(recall), |
|
|
'correct_parts': correct_parts, |
|
|
'total_parts': total_parts |
|
|
} |
|
|
|
|
|
print(f" Mean IoU: {mean_iou:.4f}") |
|
|
print(f" Recall@0.5: {recall:.4f} ({correct_parts}/{total_parts})") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CountingBenchmark: |
|
|
"""Object counting benchmark.""" |
|
|
|
|
|
def __init__(self, data_dir: str = "data/coco", max_samples: int = 200): |
|
|
self.data_dir = Path(data_dir) |
|
|
self.samples = [] |
|
|
|
|
|
|
|
|
ann_file = self.data_dir / "annotations" / "instances_val2017.json" |
|
|
if not ann_file.exists(): |
|
|
ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
|
|
|
|
|
with open(ann_file) as f: |
|
|
coco = json.load(f) |
|
|
|
|
|
self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
|
|
|
|
|
|
|
|
img_counts = defaultdict(lambda: defaultdict(int)) |
|
|
for ann in coco['annotations']: |
|
|
if not ann.get('iscrowd', 0): |
|
|
img_counts[ann['image_id']][ann['category_id']] += 1 |
|
|
|
|
|
for img in coco['images']: |
|
|
if img['id'] not in img_counts: |
|
|
continue |
|
|
|
|
|
img_path = self.data_dir / "images" / img['file_name'] |
|
|
if not img_path.exists(): |
|
|
continue |
|
|
|
|
|
counts = img_counts[img['id']] |
|
|
|
|
|
most_common_cat = max(counts.keys(), key=lambda k: counts[k]) |
|
|
count = counts[most_common_cat] |
|
|
|
|
|
if 2 <= count <= 10: |
|
|
self.samples.append({ |
|
|
'path': str(img_path), |
|
|
'category': self.cat_id_to_name[most_common_cat], |
|
|
'count': count |
|
|
}) |
|
|
|
|
|
if len(self.samples) >= max_samples: |
|
|
break |
|
|
|
|
|
print(f" Loaded {len(self.samples)} counting samples") |
|
|
|
|
|
def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
|
|
"""Evaluate counting accuracy.""" |
|
|
print("\n🔢 Counting Benchmark") |
|
|
print("-" * 40) |
|
|
|
|
|
exact_matches = 0 |
|
|
within_one = 0 |
|
|
total = 0 |
|
|
errors = [] |
|
|
|
|
|
for i, sample in enumerate(self.samples): |
|
|
if i % 25 == 0: |
|
|
print(f" Progress: {i}/{len(self.samples)}") |
|
|
|
|
|
try: |
|
|
image = Image.open(sample['path']).convert('RGB') |
|
|
question = f"How many {sample['category']}s are in this image?" |
|
|
|
|
|
output = model.generate(image, mode="text", prompt=question) |
|
|
|
|
|
|
|
|
response = output.text.lower() |
|
|
gt_count = sample['count'] |
|
|
|
|
|
|
|
|
pred_count = None |
|
|
for word in response.split(): |
|
|
try: |
|
|
pred_count = int(word) |
|
|
break |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
word_to_num = { |
|
|
'zero': 0, 'one': 1, 'two': 2, 'three': 3, 'four': 4, |
|
|
'five': 5, 'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10 |
|
|
} |
|
|
if pred_count is None: |
|
|
for word, num in word_to_num.items(): |
|
|
if word in response: |
|
|
pred_count = num |
|
|
break |
|
|
|
|
|
if pred_count is not None: |
|
|
total += 1 |
|
|
if pred_count == gt_count: |
|
|
exact_matches += 1 |
|
|
if abs(pred_count - gt_count) <= 1: |
|
|
within_one += 1 |
|
|
errors.append(abs(pred_count - gt_count)) |
|
|
|
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
accuracy = exact_matches / total if total > 0 else 0 |
|
|
within1_acc = within_one / total if total > 0 else 0 |
|
|
mae = np.mean(errors) if errors else 0 |
|
|
|
|
|
results = { |
|
|
'exact_accuracy': float(accuracy), |
|
|
'within_one_accuracy': float(within1_acc), |
|
|
'mae': float(mae), |
|
|
'total': total |
|
|
} |
|
|
|
|
|
print(f" Exact Accuracy: {accuracy:.2%}") |
|
|
print(f" Within-1 Accuracy: {within1_acc:.2%}") |
|
|
print(f" Mean Absolute Error: {mae:.2f}") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VQABenchmark: |
|
|
"""Visual Question Answering benchmark.""" |
|
|
|
|
|
def __init__(self, data_dir: str = "data/coco", max_samples: int = 200): |
|
|
self.data_dir = Path(data_dir) |
|
|
|
|
|
|
|
|
self.samples = [] |
|
|
|
|
|
ann_file = self.data_dir / "annotations" / "instances_val2017.json" |
|
|
if not ann_file.exists(): |
|
|
ann_file = self.data_dir / "annotations" / "instances_train2017.json" |
|
|
|
|
|
with open(ann_file) as f: |
|
|
coco = json.load(f) |
|
|
|
|
|
self.cat_id_to_name = {c['id']: c['name'] for c in coco['categories']} |
|
|
|
|
|
|
|
|
img_cats = defaultdict(set) |
|
|
for ann in coco['annotations']: |
|
|
img_cats[ann['image_id']].add(ann['category_id']) |
|
|
|
|
|
for img in coco['images']: |
|
|
if img['id'] not in img_cats: |
|
|
continue |
|
|
|
|
|
img_path = self.data_dir / "images" / img['file_name'] |
|
|
if not img_path.exists(): |
|
|
continue |
|
|
|
|
|
cats = list(img_cats[img['id']]) |
|
|
if cats: |
|
|
cat = random.choice(cats) |
|
|
cat_name = self.cat_id_to_name[cat] |
|
|
|
|
|
|
|
|
questions = [ |
|
|
(f"Is there a {cat_name} in this image?", "yes"), |
|
|
(f"What objects are visible in this image?", cat_name), |
|
|
] |
|
|
|
|
|
for q, a in questions[:1]: |
|
|
self.samples.append({ |
|
|
'path': str(img_path), |
|
|
'question': q, |
|
|
'answer': a |
|
|
}) |
|
|
|
|
|
if len(self.samples) >= max_samples: |
|
|
break |
|
|
|
|
|
print(f" Loaded {len(self.samples)} VQA samples") |
|
|
|
|
|
def evaluate(self, model: OculusForConditionalGeneration) -> Dict: |
|
|
"""Evaluate VQA accuracy.""" |
|
|
print("\n❓ VQA Benchmark") |
|
|
print("-" * 40) |
|
|
|
|
|
correct = 0 |
|
|
total = 0 |
|
|
|
|
|
for i, sample in enumerate(self.samples): |
|
|
if i % 25 == 0: |
|
|
print(f" Progress: {i}/{len(self.samples)}") |
|
|
|
|
|
try: |
|
|
image = Image.open(sample['path']).convert('RGB') |
|
|
output = model.generate(image, mode="text", prompt=sample['question']) |
|
|
|
|
|
response = output.text.lower() |
|
|
answer = sample['answer'].lower() |
|
|
|
|
|
|
|
|
is_correct = answer in response |
|
|
|
|
|
if is_correct: |
|
|
correct += 1 |
|
|
total += 1 |
|
|
|
|
|
except Exception as e: |
|
|
pass |
|
|
|
|
|
accuracy = correct / total if total > 0 else 0 |
|
|
|
|
|
results = { |
|
|
'accuracy': float(accuracy), |
|
|
'correct': correct, |
|
|
'total': total |
|
|
} |
|
|
|
|
|
print(f" Accuracy: {accuracy:.2%} ({correct}/{total})") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_benchmarks(model_path: str, benchmarks: List[str] = None): |
|
|
"""Run all benchmarks on the model.""" |
|
|
|
|
|
print("=" * 70) |
|
|
print("🔮 OCULUS BENCHMARK EVALUATION SUITE") |
|
|
print("=" * 70) |
|
|
print(f"Model: {model_path}") |
|
|
|
|
|
|
|
|
print("\n[Loading Model]") |
|
|
model = OculusForConditionalGeneration.from_pretrained(model_path) |
|
|
|
|
|
|
|
|
heads_path = Path(model_path) / "heads.pth" |
|
|
if heads_path.exists(): |
|
|
import torch |
|
|
heads = torch.load(heads_path) |
|
|
model.detection_head.load_state_dict(heads['detection']) |
|
|
model.point_head.load_state_dict(heads['point']) |
|
|
print(" ✓ Loaded trained detection heads") |
|
|
|
|
|
model.vision_encoder.load_encoders() |
|
|
model.load_language_model() |
|
|
|
|
|
all_results = {} |
|
|
|
|
|
|
|
|
if benchmarks is None: |
|
|
benchmarks = ['coco', 'car_damage', 'counting', 'vqa'] |
|
|
|
|
|
if 'coco' in benchmarks: |
|
|
bench = COCODetectionBenchmark(max_samples=100) |
|
|
all_results['coco_detection'] = bench.evaluate(model) |
|
|
|
|
|
if 'car_damage' in benchmarks: |
|
|
bench = CarDamageBenchmark(max_samples=50) |
|
|
all_results['car_damage'] = bench.evaluate(model) |
|
|
|
|
|
if 'counting' in benchmarks: |
|
|
bench = CountingBenchmark(max_samples=100) |
|
|
all_results['counting'] = bench.evaluate(model) |
|
|
|
|
|
if 'vqa' in benchmarks: |
|
|
bench = VQABenchmark(max_samples=100) |
|
|
all_results['vqa'] = bench.evaluate(model) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("📊 BENCHMARK SUMMARY") |
|
|
print("=" * 70) |
|
|
|
|
|
for name, results in all_results.items(): |
|
|
print(f"\n{name}:") |
|
|
for k, v in results.items(): |
|
|
if isinstance(v, float): |
|
|
print(f" {k}: {v:.4f}") |
|
|
else: |
|
|
print(f" {k}: {v}") |
|
|
|
|
|
|
|
|
results_path = Path(model_path) / "benchmark_results.json" |
|
|
with open(results_path, "w") as f: |
|
|
json.dump(all_results, f, indent=2) |
|
|
print(f"\n💾 Results saved to: {results_path}") |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument("--model", default="checkpoints/oculus_detection/final") |
|
|
parser.add_argument("--benchmarks", nargs="+", default=None) |
|
|
args = parser.parse_args() |
|
|
|
|
|
run_benchmarks(args.model, args.benchmarks) |
|
|
|