Spaces:
Sleeping
Sleeping
| """ | |
| Annotation corruption strategies for the Annotation QA Environment. | |
| Takes gold-standard annotations and systematically corrupts them to create | |
| training data with known errors. The corruption is deterministic given a seed. | |
| Corruption types by difficulty: | |
| - Task 1 (Easy): Obvious bbox errors β expand, shift, delete, add spurious | |
| - Task 2 (Medium): bbox + class errors β similar class confusion, boundary errors | |
| - Task 3 (Hard): Cross-image inconsistencies + subtle errors | |
| """ | |
| import copy | |
| import random | |
| from typing import Dict, List, Tuple | |
| # Class confusion maps β used for "similar class" corruption | |
| SIMILAR_CLASSES: Dict[str, List[str]] = { | |
| "car": ["truck", "van"], | |
| "truck": ["car", "van"], | |
| "van": ["car", "truck"], | |
| "person": ["cyclist"], | |
| "cyclist": ["person"], | |
| "dog": ["cat"], | |
| "cat": ["dog"], | |
| "bicycle": ["motorcycle"], | |
| "motorcycle": ["bicycle"], | |
| "tree": ["bush"], | |
| "bush": ["tree"], | |
| "building": ["house"], | |
| "house": ["building"], | |
| "traffic_light": ["street_light"], | |
| "street_light": ["traffic_light"], | |
| "bench": ["chair"], | |
| "chair": ["bench"], | |
| } | |
| # Completely different classes for "wrong category" corruption | |
| ALL_CLASSES = [ | |
| "car", "truck", "person", "bicycle", "dog", "cat", | |
| "tree", "building", "traffic_light", "bench", | |
| ] | |
| def _clamp(val: float, lo: float = 0.0, hi: float = 1.0) -> float: | |
| return max(lo, min(hi, val)) | |
| def _clamp_bbox(bbox: List[float]) -> List[float]: | |
| """Ensure bbox stays within [0, 1] image bounds.""" | |
| x, y, w, h = bbox | |
| x = _clamp(x) | |
| y = _clamp(y) | |
| w = _clamp(w, 0.02, 1.0 - x) | |
| h = _clamp(h, 0.02, 1.0 - y) | |
| return [round(x, 4), round(y, 4), round(w, 4), round(h, 4)] | |
| def expand_bbox(bbox: List[float], factor: float) -> List[float]: | |
| """Expand a bbox by a factor (e.g., 1.5 = 50% larger).""" | |
| x, y, w, h = bbox | |
| cx, cy = x + w / 2, y + h / 2 | |
| new_w, new_h = w * factor, h * factor | |
| new_x = cx - new_w / 2 | |
| new_y = cy - new_h / 2 | |
| return _clamp_bbox([new_x, new_y, new_w, new_h]) | |
| def shift_bbox(bbox: List[float], dx_frac: float, dy_frac: float) -> List[float]: | |
| """Shift a bbox by a fraction of its size.""" | |
| x, y, w, h = bbox | |
| new_x = x + w * dx_frac | |
| new_y = y + h * dy_frac | |
| return _clamp_bbox([new_x, new_y, w, h]) | |
| def shrink_bbox(bbox: List[float], factor: float) -> List[float]: | |
| """Shrink a bbox (factor < 1.0).""" | |
| return expand_bbox(bbox, factor) | |
| def generate_spurious_annotation( | |
| existing_bboxes: List[List[float]], rng: random.Random | |
| ) -> Dict: | |
| """Generate a random annotation that doesn't overlap much with existing ones.""" | |
| for _ in range(20): # try up to 20 times | |
| w = rng.uniform(0.05, 0.20) | |
| h = rng.uniform(0.05, 0.20) | |
| x = rng.uniform(0.0, 1.0 - w) | |
| y = rng.uniform(0.0, 1.0 - h) | |
| bbox = [round(x, 4), round(y, 4), round(w, 4), round(h, 4)] | |
| # Check it doesn't overlap too much with existing | |
| from .grader import compute_iou | |
| max_iou = max( | |
| (compute_iou(bbox, eb) for eb in existing_bboxes), default=0.0 | |
| ) | |
| if max_iou < 0.3: | |
| cls = rng.choice(ALL_CLASSES) | |
| return {"bbox": bbox, "class_label": cls} | |
| # Fallback: place it anyway | |
| return { | |
| "bbox": [round(rng.uniform(0.0, 0.8), 4), round(rng.uniform(0.0, 0.8), 4), 0.1, 0.1], | |
| "class_label": rng.choice(ALL_CLASSES), | |
| } | |
| def corrupt_annotations( | |
| gold_annotations: List[Dict], | |
| difficulty: str, | |
| seed: int, | |
| ) -> Tuple[List[Dict], List[str]]: | |
| """ | |
| Corrupt gold annotations based on difficulty level. | |
| Returns: | |
| (corrupted_annotations, corruption_log) | |
| corruption_log: list of strings describing what was corrupted (for debugging) | |
| """ | |
| rng = random.Random(seed) | |
| corrupted = copy.deepcopy(gold_annotations) | |
| log = [] | |
| if difficulty == "easy": | |
| # Task 1: Obvious bbox errors only (no class changes) | |
| corruption_rate = 0.35 | |
| n_corrupt = max(1, int(len(corrupted) * corruption_rate)) | |
| indices = list(range(len(corrupted))) | |
| rng.shuffle(indices) | |
| corrupt_indices = indices[:n_corrupt] | |
| for idx in corrupt_indices: | |
| action = rng.choice(["expand", "shift", "shrink", "delete"]) | |
| ann = corrupted[idx] | |
| if action == "expand": | |
| factor = rng.uniform(1.5, 2.5) | |
| ann["bbox"] = expand_bbox(ann["bbox"], factor) | |
| log.append(f"Expanded ann {ann['id']} by {factor:.1f}x") | |
| elif action == "shift": | |
| dx = rng.uniform(-0.4, 0.4) | |
| dy = rng.uniform(-0.4, 0.4) | |
| ann["bbox"] = shift_bbox(ann["bbox"], dx, dy) | |
| log.append(f"Shifted ann {ann['id']} by ({dx:.2f}, {dy:.2f})") | |
| elif action == "shrink": | |
| factor = rng.uniform(0.3, 0.6) | |
| ann["bbox"] = shrink_bbox(ann["bbox"], factor) | |
| log.append(f"Shrunk ann {ann['id']} by {factor:.1f}x") | |
| elif action == "delete": | |
| log.append(f"Deleted ann {ann['id']} ({ann['class_label']})") | |
| corrupted[idx] = None # mark for removal | |
| # Remove deleted | |
| corrupted = [a for a in corrupted if a is not None] | |
| # Add 2-3 spurious annotations | |
| existing_bboxes = [a["bbox"] for a in corrupted] | |
| n_spurious = rng.randint(2, 3) | |
| next_id = max((a["id"] for a in corrupted), default=0) + 1 | |
| for i in range(n_spurious): | |
| spur = generate_spurious_annotation(existing_bboxes, rng) | |
| spur["id"] = next_id + i | |
| corrupted.append(spur) | |
| existing_bboxes.append(spur["bbox"]) | |
| log.append(f"Added spurious ann {spur['id']} ({spur['class_label']})") | |
| elif difficulty == "medium": | |
| # Task 2: bbox errors + class confusion | |
| corruption_rate = 0.30 | |
| n_corrupt = max(2, int(len(corrupted) * corruption_rate)) | |
| indices = list(range(len(corrupted))) | |
| rng.shuffle(indices) | |
| corrupt_indices = indices[:n_corrupt] | |
| for idx in corrupt_indices: | |
| action = rng.choice([ | |
| "expand", "shift", "wrong_similar_class", | |
| "wrong_different_class", "delete", | |
| ]) | |
| ann = corrupted[idx] | |
| if action == "expand": | |
| factor = rng.uniform(1.3, 2.0) | |
| ann["bbox"] = expand_bbox(ann["bbox"], factor) | |
| log.append(f"Expanded ann {ann['id']} by {factor:.1f}x") | |
| elif action == "shift": | |
| dx = rng.uniform(-0.3, 0.3) | |
| dy = rng.uniform(-0.3, 0.3) | |
| ann["bbox"] = shift_bbox(ann["bbox"], dx, dy) | |
| log.append(f"Shifted ann {ann['id']}") | |
| elif action == "wrong_similar_class": | |
| old_cls = ann["class_label"] | |
| similar = SIMILAR_CLASSES.get(old_cls, []) | |
| if similar: | |
| new_cls = rng.choice(similar) | |
| ann["class_label"] = new_cls | |
| log.append(f"Changed ann {ann['id']} class: {old_cls} β {new_cls}") | |
| else: | |
| # Fallback to a different class | |
| candidates = [c for c in ALL_CLASSES if c != old_cls] | |
| ann["class_label"] = rng.choice(candidates) | |
| log.append(f"Changed ann {ann['id']} class: {old_cls} β {ann['class_label']}") | |
| elif action == "wrong_different_class": | |
| old_cls = ann["class_label"] | |
| candidates = [c for c in ALL_CLASSES if c != old_cls] | |
| ann["class_label"] = rng.choice(candidates) | |
| log.append(f"Changed ann {ann['id']} class: {old_cls} β {ann['class_label']} (wrong category)") | |
| elif action == "delete": | |
| log.append(f"Deleted ann {ann['id']} ({ann['class_label']})") | |
| corrupted[idx] = None | |
| corrupted = [a for a in corrupted if a is not None] | |
| # Add 3-4 spurious | |
| existing_bboxes = [a["bbox"] for a in corrupted] | |
| n_spurious = rng.randint(3, 4) | |
| next_id = max((a["id"] for a in corrupted), default=0) + 1 | |
| for i in range(n_spurious): | |
| spur = generate_spurious_annotation(existing_bboxes, rng) | |
| spur["id"] = next_id + i | |
| corrupted.append(spur) | |
| existing_bboxes.append(spur["bbox"]) | |
| log.append(f"Added spurious ann {spur['id']} ({spur['class_label']})") | |
| elif difficulty == "hard": | |
| # Task 3: Subtle errors + class confusion + some bbox | |
| corruption_rate = 0.25 | |
| n_corrupt = max(2, int(len(corrupted) * corruption_rate)) | |
| indices = list(range(len(corrupted))) | |
| rng.shuffle(indices) | |
| corrupt_indices = indices[:n_corrupt] | |
| for idx in corrupt_indices: | |
| action = rng.choice([ | |
| "subtle_shift", "wrong_similar_class", | |
| "wrong_similar_class", "delete", "subtle_expand", | |
| ]) | |
| ann = corrupted[idx] | |
| if action == "subtle_shift": | |
| dx = rng.uniform(-0.15, 0.15) | |
| dy = rng.uniform(-0.15, 0.15) | |
| ann["bbox"] = shift_bbox(ann["bbox"], dx, dy) | |
| log.append(f"Subtly shifted ann {ann['id']}") | |
| elif action == "subtle_expand": | |
| factor = rng.uniform(1.15, 1.4) | |
| ann["bbox"] = expand_bbox(ann["bbox"], factor) | |
| log.append(f"Subtly expanded ann {ann['id']}") | |
| elif action == "wrong_similar_class": | |
| old_cls = ann["class_label"] | |
| similar = SIMILAR_CLASSES.get(old_cls, []) | |
| if similar: | |
| new_cls = rng.choice(similar) | |
| ann["class_label"] = new_cls | |
| log.append(f"Changed ann {ann['id']}: {old_cls} β {new_cls} (similar)") | |
| else: | |
| candidates = [c for c in ALL_CLASSES if c != old_cls] | |
| ann["class_label"] = rng.choice(candidates) | |
| log.append(f"Changed ann {ann['id']}: {old_cls} β {ann['class_label']}") | |
| elif action == "delete": | |
| log.append(f"Deleted ann {ann['id']}") | |
| corrupted[idx] = None | |
| corrupted = [a for a in corrupted if a is not None] | |
| # Add 2-3 spurious | |
| existing_bboxes = [a["bbox"] for a in corrupted] | |
| n_spurious = rng.randint(2, 3) | |
| next_id = max((a["id"] for a in corrupted), default=0) + 1 | |
| for i in range(n_spurious): | |
| spur = generate_spurious_annotation(existing_bboxes, rng) | |
| spur["id"] = next_id + i | |
| corrupted.append(spur) | |
| existing_bboxes.append(spur["bbox"]) | |
| log.append(f"Added spurious ann {spur['id']}") | |
| return corrupted, log | |