annotation-qa-env / server /corruption.py
k3tikvats
initial commit
8b4d6a8
"""
Annotation corruption strategies for the Annotation QA Environment.
Takes gold-standard annotations and systematically corrupts them to create
training data with known errors. The corruption is deterministic given a seed.
Corruption types by difficulty:
- Task 1 (Easy): Obvious bbox errors β€” expand, shift, delete, add spurious
- Task 2 (Medium): bbox + class errors β€” similar class confusion, boundary errors
- Task 3 (Hard): Cross-image inconsistencies + subtle errors
"""
import copy
import random
from typing import Dict, List, Tuple
# Class confusion maps β€” used for "similar class" corruption
SIMILAR_CLASSES: Dict[str, List[str]] = {
"car": ["truck", "van"],
"truck": ["car", "van"],
"van": ["car", "truck"],
"person": ["cyclist"],
"cyclist": ["person"],
"dog": ["cat"],
"cat": ["dog"],
"bicycle": ["motorcycle"],
"motorcycle": ["bicycle"],
"tree": ["bush"],
"bush": ["tree"],
"building": ["house"],
"house": ["building"],
"traffic_light": ["street_light"],
"street_light": ["traffic_light"],
"bench": ["chair"],
"chair": ["bench"],
}
# Completely different classes for "wrong category" corruption
ALL_CLASSES = [
"car", "truck", "person", "bicycle", "dog", "cat",
"tree", "building", "traffic_light", "bench",
]
def _clamp(val: float, lo: float = 0.0, hi: float = 1.0) -> float:
return max(lo, min(hi, val))
def _clamp_bbox(bbox: List[float]) -> List[float]:
"""Ensure bbox stays within [0, 1] image bounds."""
x, y, w, h = bbox
x = _clamp(x)
y = _clamp(y)
w = _clamp(w, 0.02, 1.0 - x)
h = _clamp(h, 0.02, 1.0 - y)
return [round(x, 4), round(y, 4), round(w, 4), round(h, 4)]
def expand_bbox(bbox: List[float], factor: float) -> List[float]:
"""Expand a bbox by a factor (e.g., 1.5 = 50% larger)."""
x, y, w, h = bbox
cx, cy = x + w / 2, y + h / 2
new_w, new_h = w * factor, h * factor
new_x = cx - new_w / 2
new_y = cy - new_h / 2
return _clamp_bbox([new_x, new_y, new_w, new_h])
def shift_bbox(bbox: List[float], dx_frac: float, dy_frac: float) -> List[float]:
"""Shift a bbox by a fraction of its size."""
x, y, w, h = bbox
new_x = x + w * dx_frac
new_y = y + h * dy_frac
return _clamp_bbox([new_x, new_y, w, h])
def shrink_bbox(bbox: List[float], factor: float) -> List[float]:
"""Shrink a bbox (factor < 1.0)."""
return expand_bbox(bbox, factor)
def generate_spurious_annotation(
existing_bboxes: List[List[float]], rng: random.Random
) -> Dict:
"""Generate a random annotation that doesn't overlap much with existing ones."""
for _ in range(20): # try up to 20 times
w = rng.uniform(0.05, 0.20)
h = rng.uniform(0.05, 0.20)
x = rng.uniform(0.0, 1.0 - w)
y = rng.uniform(0.0, 1.0 - h)
bbox = [round(x, 4), round(y, 4), round(w, 4), round(h, 4)]
# Check it doesn't overlap too much with existing
from .grader import compute_iou
max_iou = max(
(compute_iou(bbox, eb) for eb in existing_bboxes), default=0.0
)
if max_iou < 0.3:
cls = rng.choice(ALL_CLASSES)
return {"bbox": bbox, "class_label": cls}
# Fallback: place it anyway
return {
"bbox": [round(rng.uniform(0.0, 0.8), 4), round(rng.uniform(0.0, 0.8), 4), 0.1, 0.1],
"class_label": rng.choice(ALL_CLASSES),
}
def corrupt_annotations(
gold_annotations: List[Dict],
difficulty: str,
seed: int,
) -> Tuple[List[Dict], List[str]]:
"""
Corrupt gold annotations based on difficulty level.
Returns:
(corrupted_annotations, corruption_log)
corruption_log: list of strings describing what was corrupted (for debugging)
"""
rng = random.Random(seed)
corrupted = copy.deepcopy(gold_annotations)
log = []
if difficulty == "easy":
# Task 1: Obvious bbox errors only (no class changes)
corruption_rate = 0.35
n_corrupt = max(1, int(len(corrupted) * corruption_rate))
indices = list(range(len(corrupted)))
rng.shuffle(indices)
corrupt_indices = indices[:n_corrupt]
for idx in corrupt_indices:
action = rng.choice(["expand", "shift", "shrink", "delete"])
ann = corrupted[idx]
if action == "expand":
factor = rng.uniform(1.5, 2.5)
ann["bbox"] = expand_bbox(ann["bbox"], factor)
log.append(f"Expanded ann {ann['id']} by {factor:.1f}x")
elif action == "shift":
dx = rng.uniform(-0.4, 0.4)
dy = rng.uniform(-0.4, 0.4)
ann["bbox"] = shift_bbox(ann["bbox"], dx, dy)
log.append(f"Shifted ann {ann['id']} by ({dx:.2f}, {dy:.2f})")
elif action == "shrink":
factor = rng.uniform(0.3, 0.6)
ann["bbox"] = shrink_bbox(ann["bbox"], factor)
log.append(f"Shrunk ann {ann['id']} by {factor:.1f}x")
elif action == "delete":
log.append(f"Deleted ann {ann['id']} ({ann['class_label']})")
corrupted[idx] = None # mark for removal
# Remove deleted
corrupted = [a for a in corrupted if a is not None]
# Add 2-3 spurious annotations
existing_bboxes = [a["bbox"] for a in corrupted]
n_spurious = rng.randint(2, 3)
next_id = max((a["id"] for a in corrupted), default=0) + 1
for i in range(n_spurious):
spur = generate_spurious_annotation(existing_bboxes, rng)
spur["id"] = next_id + i
corrupted.append(spur)
existing_bboxes.append(spur["bbox"])
log.append(f"Added spurious ann {spur['id']} ({spur['class_label']})")
elif difficulty == "medium":
# Task 2: bbox errors + class confusion
corruption_rate = 0.30
n_corrupt = max(2, int(len(corrupted) * corruption_rate))
indices = list(range(len(corrupted)))
rng.shuffle(indices)
corrupt_indices = indices[:n_corrupt]
for idx in corrupt_indices:
action = rng.choice([
"expand", "shift", "wrong_similar_class",
"wrong_different_class", "delete",
])
ann = corrupted[idx]
if action == "expand":
factor = rng.uniform(1.3, 2.0)
ann["bbox"] = expand_bbox(ann["bbox"], factor)
log.append(f"Expanded ann {ann['id']} by {factor:.1f}x")
elif action == "shift":
dx = rng.uniform(-0.3, 0.3)
dy = rng.uniform(-0.3, 0.3)
ann["bbox"] = shift_bbox(ann["bbox"], dx, dy)
log.append(f"Shifted ann {ann['id']}")
elif action == "wrong_similar_class":
old_cls = ann["class_label"]
similar = SIMILAR_CLASSES.get(old_cls, [])
if similar:
new_cls = rng.choice(similar)
ann["class_label"] = new_cls
log.append(f"Changed ann {ann['id']} class: {old_cls} β†’ {new_cls}")
else:
# Fallback to a different class
candidates = [c for c in ALL_CLASSES if c != old_cls]
ann["class_label"] = rng.choice(candidates)
log.append(f"Changed ann {ann['id']} class: {old_cls} β†’ {ann['class_label']}")
elif action == "wrong_different_class":
old_cls = ann["class_label"]
candidates = [c for c in ALL_CLASSES if c != old_cls]
ann["class_label"] = rng.choice(candidates)
log.append(f"Changed ann {ann['id']} class: {old_cls} β†’ {ann['class_label']} (wrong category)")
elif action == "delete":
log.append(f"Deleted ann {ann['id']} ({ann['class_label']})")
corrupted[idx] = None
corrupted = [a for a in corrupted if a is not None]
# Add 3-4 spurious
existing_bboxes = [a["bbox"] for a in corrupted]
n_spurious = rng.randint(3, 4)
next_id = max((a["id"] for a in corrupted), default=0) + 1
for i in range(n_spurious):
spur = generate_spurious_annotation(existing_bboxes, rng)
spur["id"] = next_id + i
corrupted.append(spur)
existing_bboxes.append(spur["bbox"])
log.append(f"Added spurious ann {spur['id']} ({spur['class_label']})")
elif difficulty == "hard":
# Task 3: Subtle errors + class confusion + some bbox
corruption_rate = 0.25
n_corrupt = max(2, int(len(corrupted) * corruption_rate))
indices = list(range(len(corrupted)))
rng.shuffle(indices)
corrupt_indices = indices[:n_corrupt]
for idx in corrupt_indices:
action = rng.choice([
"subtle_shift", "wrong_similar_class",
"wrong_similar_class", "delete", "subtle_expand",
])
ann = corrupted[idx]
if action == "subtle_shift":
dx = rng.uniform(-0.15, 0.15)
dy = rng.uniform(-0.15, 0.15)
ann["bbox"] = shift_bbox(ann["bbox"], dx, dy)
log.append(f"Subtly shifted ann {ann['id']}")
elif action == "subtle_expand":
factor = rng.uniform(1.15, 1.4)
ann["bbox"] = expand_bbox(ann["bbox"], factor)
log.append(f"Subtly expanded ann {ann['id']}")
elif action == "wrong_similar_class":
old_cls = ann["class_label"]
similar = SIMILAR_CLASSES.get(old_cls, [])
if similar:
new_cls = rng.choice(similar)
ann["class_label"] = new_cls
log.append(f"Changed ann {ann['id']}: {old_cls} β†’ {new_cls} (similar)")
else:
candidates = [c for c in ALL_CLASSES if c != old_cls]
ann["class_label"] = rng.choice(candidates)
log.append(f"Changed ann {ann['id']}: {old_cls} β†’ {ann['class_label']}")
elif action == "delete":
log.append(f"Deleted ann {ann['id']}")
corrupted[idx] = None
corrupted = [a for a in corrupted if a is not None]
# Add 2-3 spurious
existing_bboxes = [a["bbox"] for a in corrupted]
n_spurious = rng.randint(2, 3)
next_id = max((a["id"] for a in corrupted), default=0) + 1
for i in range(n_spurious):
spur = generate_spurious_annotation(existing_bboxes, rng)
spur["id"] = next_id + i
corrupted.append(spur)
existing_bboxes.append(spur["bbox"])
log.append(f"Added spurious ann {spur['id']}")
return corrupted, log