| |
| """ |
| OCULUS Detection Head Training |
| |
| Trains the detection (box) and point heads on COCO detection data. |
| Uses the frozen vision encoders + trained projector, only trains the heads. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import random |
| from pathlib import Path |
| from dataclasses import dataclass |
| from typing import List, Dict, Tuple, Optional |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.utils.data import Dataset, DataLoader |
| from PIL import Image |
|
|
| OCULUS_ROOT = Path(__file__).parent |
|
|
| |
| sys.path.insert(0, str(OCULUS_ROOT)) |
|
|
| from oculus_unified_model import OculusForConditionalGeneration, OculusConfig |
|
|
|
|
| @dataclass |
| class DetectionTrainingConfig: |
| """Training configuration.""" |
| |
| data_dir: str = "data/coco" |
| annotations_file: str = "annotations/instances_train2017.json" |
| images_subdir: str = "images" |
| |
| |
| batch_size: int = 4 |
| learning_rate: float = 1e-4 |
| num_epochs: int = 3 |
| warmup_steps: int = 100 |
| max_samples: int = 3000 |
| |
| |
| checkpoint_path: str = "checkpoints/oculus_coco/final" |
| |
| |
| save_every: int = 200 |
| checkpoint_dir: str = "checkpoints/oculus_detection" |
| |
| |
| log_every: int = 25 |
|
|
|
|
| class COCODetectionDataset(Dataset): |
| """COCO Detection dataset.""" |
| |
| |
| COCO_CLASSES = [ |
| 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', |
| 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', |
| 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', |
| 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', |
| 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', |
| 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', |
| 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', |
| 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', |
| 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', |
| 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', |
| 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', |
| 'toothbrush' |
| ] |
| |
| def __init__(self, data_dir: str, annotations_file: str, images_subdir: str, max_samples: int = None): |
| self.data_dir = Path(data_dir) |
| self.images_dir = self.data_dir / images_subdir |
| |
| |
| annotations_path = self.data_dir / annotations_file |
| print(f" Loading annotations from {annotations_path}...") |
| |
| with open(annotations_path) as f: |
| coco_data = json.load(f) |
| |
| |
| self.cat_id_to_idx = {} |
| for i, cat in enumerate(coco_data['categories']): |
| self.cat_id_to_idx[cat['id']] = i |
| |
| |
| img_to_anns = {} |
| for ann in coco_data['annotations']: |
| img_id = ann['image_id'] |
| if img_id not in img_to_anns: |
| img_to_anns[img_id] = [] |
| img_to_anns[img_id].append(ann) |
| |
| |
| self.samples = [] |
| for img_info in coco_data['images']: |
| img_id = img_info['id'] |
| if img_id not in img_to_anns: |
| continue |
| |
| |
| img_path = self.images_dir / img_info['file_name'] |
| if not img_path.exists(): |
| continue |
| |
| anns = img_to_anns[img_id] |
| |
| |
| boxes = [] |
| labels = [] |
| for ann in anns: |
| if 'bbox' not in ann or ann.get('iscrowd', 0): |
| continue |
| |
| |
| x, y, w, h = ann['bbox'] |
| |
| |
| x1 = x / img_info['width'] |
| y1 = y / img_info['height'] |
| x2 = (x + w) / img_info['width'] |
| y2 = (y + h) / img_info['height'] |
| |
| |
| x1, y1, x2, y2 = max(0, x1), max(0, y1), min(1, x2), min(1, y2) |
| |
| boxes.append([x1, y1, x2, y2]) |
| labels.append(self.cat_id_to_idx[ann['category_id']]) |
| |
| if boxes: |
| self.samples.append({ |
| 'image_path': str(img_path), |
| 'boxes': boxes, |
| 'labels': labels, |
| 'width': img_info['width'], |
| 'height': img_info['height'] |
| }) |
| |
| if max_samples and len(self.samples) >= max_samples: |
| break |
| |
| print(f" Loaded {len(self.samples):,} images with detections") |
| |
| def __len__(self): |
| return len(self.samples) |
| |
| def __getitem__(self, idx): |
| return self.samples[idx] |
|
|
|
|
| class DetectionTrainer: |
| """Trainer for detection heads.""" |
| |
| def __init__(self, config: DetectionTrainingConfig): |
| self.config = config |
| |
| print("\n" + "=" * 60) |
| print("🎯 OCULUS DETECTION TRAINER") |
| print("=" * 60) |
| |
| self._load_model() |
| self._load_dataset() |
| self._create_optimizer() |
| |
| self.checkpoint_dir = Path(config.checkpoint_dir) |
| self.checkpoint_dir.mkdir(parents=True, exist_ok=True) |
| |
| def _load_model(self): |
| """Load model with trained projector.""" |
| print("\n[Loading Model]") |
| |
| checkpoint_path = OCULUS_ROOT / self.config.checkpoint_path |
| self.model = OculusForConditionalGeneration.from_pretrained(checkpoint_path) |
| |
| |
| self.model.vision_encoder.load_encoders() |
| |
| |
| for param in self.model.vision_encoder.parameters(): |
| param.requires_grad = False |
| for param in self.model.projector.parameters(): |
| param.requires_grad = False |
| |
| |
| for param in self.model.detection_head.parameters(): |
| param.requires_grad = True |
| for param in self.model.point_head.parameters(): |
| param.requires_grad = True |
| |
| |
| trainable = sum(p.numel() for p in self.model.parameters() if p.requires_grad) |
| total = sum(p.numel() for p in self.model.parameters()) |
| print(f" ✓ Trainable: {trainable:,} / {total:,} parameters") |
| |
| def _load_dataset(self): |
| """Load COCO detection dataset.""" |
| print("\n[Loading Dataset]") |
| self.dataset = COCODetectionDataset( |
| self.config.data_dir, |
| self.config.annotations_file, |
| self.config.images_subdir, |
| max_samples=self.config.max_samples |
| ) |
| |
| def _create_optimizer(self): |
| """Create optimizer for detection heads only.""" |
| print("\n[Optimizer]") |
| |
| |
| params = list(self.model.detection_head.parameters()) + \ |
| list(self.model.point_head.parameters()) |
| |
| if self.model.vision_adapter is not None: |
| params += list(self.model.vision_adapter.parameters()) |
| |
| self.optimizer = torch.optim.AdamW(params, lr=self.config.learning_rate, weight_decay=0.01) |
| print(f" ✓ AdamW (lr={self.config.learning_rate})") |
| |
| def encode_image(self, image_path: str) -> torch.Tensor: |
| """Encode image to vision tokens.""" |
| image = Image.open(image_path).convert('RGB') |
| |
| with torch.no_grad(): |
| vision_tokens = self.model.encode_image(image) |
| |
| return vision_tokens |
| |
| def compute_detection_loss( |
| self, |
| vision_tokens: torch.Tensor, |
| target_boxes: List[List[float]], |
| target_labels: List[int] |
| ) -> Tuple[torch.Tensor, Dict]: |
| """Compute detection loss.""" |
| |
| |
| cls_logits, box_preds = self.model.detection_head(vision_tokens) |
| |
| batch_size = vision_tokens.shape[0] |
| num_tokens = vision_tokens.shape[1] |
| |
| |
| total_cls_loss = 0 |
| total_box_loss = 0 |
| num_matches = 0 |
| |
| target_boxes_t = torch.tensor(target_boxes, dtype=torch.float32) |
| target_labels_t = torch.tensor(target_labels, dtype=torch.long) |
| |
| for i in range(batch_size): |
| if len(target_boxes) == 0: |
| continue |
| |
| |
| pred_boxes = box_preds[i] |
| pred_cls = cls_logits[i] |
| |
| |
| for gt_idx, (gt_box, gt_label) in enumerate(zip(target_boxes, target_labels)): |
| gt_box_t = torch.tensor(gt_box, dtype=torch.float32) |
| |
| |
| ious = self._compute_iou(pred_boxes, gt_box_t.unsqueeze(0).expand(num_tokens, -1)) |
| |
| |
| best_idx = ious.argmax() |
| |
| |
| cls_loss = F.cross_entropy( |
| pred_cls[best_idx:best_idx+1], |
| torch.tensor([gt_label], dtype=torch.long) |
| ) |
| |
| |
| box_loss = F.l1_loss(pred_boxes[best_idx], gt_box_t) |
| |
| total_cls_loss += cls_loss |
| total_box_loss += box_loss |
| num_matches += 1 |
| |
| if num_matches > 0: |
| total_cls_loss /= num_matches |
| total_box_loss /= num_matches |
| |
| |
| total_loss = total_cls_loss + 5.0 * total_box_loss |
| |
| return total_loss, { |
| 'cls_loss': float(total_cls_loss) if num_matches > 0 else 0, |
| 'box_loss': float(total_box_loss) if num_matches > 0 else 0, |
| 'num_matches': num_matches |
| } |
| |
| def _compute_iou(self, boxes1: torch.Tensor, boxes2: torch.Tensor) -> torch.Tensor: |
| """Compute IoU between two sets of boxes.""" |
| |
| x1 = torch.max(boxes1[:, 0], boxes2[:, 0]) |
| y1 = torch.max(boxes1[:, 1], boxes2[:, 1]) |
| x2 = torch.min(boxes1[:, 2], boxes2[:, 2]) |
| y2 = torch.min(boxes1[:, 3], boxes2[:, 3]) |
| |
| inter_area = torch.clamp(x2 - x1, min=0) * torch.clamp(y2 - y1, min=0) |
| |
| area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1]) |
| area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1]) |
| |
| union_area = area1 + area2 - inter_area + 1e-8 |
| |
| return inter_area / union_area |
| |
| def train_step(self, sample: Dict) -> Tuple[float, Dict]: |
| """Single training step.""" |
| |
| self.optimizer.zero_grad() |
| |
| try: |
| |
| image = Image.open(sample['image_path']).convert('RGB') |
| |
| |
| with torch.no_grad(): |
| vision_features = self.model.vision_encoder(image) |
| |
| |
| actual_dim = vision_features.shape[-1] |
| expected_dim = self.model.config.fused_vision_dim |
| |
| if actual_dim != expected_dim: |
| if self.model.vision_adapter is None: |
| print(f" [Adapter] Creating: {actual_dim} -> {expected_dim}") |
| self.model.vision_adapter = nn.Linear(actual_dim, expected_dim) |
| nn.init.xavier_uniform_(self.model.vision_adapter.weight) |
| nn.init.zeros_(self.model.vision_adapter.bias) |
| |
| |
| self.optimizer.add_param_group({ |
| 'params': self.model.vision_adapter.parameters() |
| }) |
| |
| vision_features = self.model.vision_adapter(vision_features) |
| |
| |
| vision_tokens = self.model.projector(vision_features) |
| |
| |
| loss, metrics = self.compute_detection_loss( |
| vision_tokens, |
| sample['boxes'], |
| sample['labels'] |
| ) |
| |
| if loss.requires_grad: |
| loss.backward() |
| self.optimizer.step() |
| |
| return float(loss), metrics |
| |
| except Exception as e: |
| print(f" ⚠️ Error: {e}") |
| return 0.0, {} |
| |
| def save_checkpoint(self, step: int, loss: float): |
| """Save checkpoint.""" |
| checkpoint_path = self.checkpoint_dir / f"step_{step:06d}" |
| checkpoint_path.mkdir(exist_ok=True) |
| |
| |
| torch.save({ |
| 'detection': self.model.detection_head.state_dict(), |
| 'point': self.model.point_head.state_dict(), |
| 'adapter': self.model.vision_adapter.state_dict() if self.model.vision_adapter else None, |
| }, checkpoint_path / "heads.pth") |
| |
| |
| state = {'step': step, 'loss': loss} |
| with open(checkpoint_path / "state.json", "w") as f: |
| json.dump(state, f, indent=2) |
| |
| print(f" 💾 Checkpoint: {checkpoint_path}") |
| |
| def train(self): |
| """Main training loop.""" |
| print("\n" + "=" * 60) |
| print("🚀 STARTING DETECTION TRAINING") |
| print("=" * 60) |
| print(f" Dataset: {len(self.dataset):,} samples") |
| print(f" Epochs: {self.config.num_epochs}") |
| print(f" Learning rate: {self.config.learning_rate}") |
| |
| global_step = 0 |
| best_loss = float('inf') |
| start_time = time.time() |
| |
| for epoch in range(self.config.num_epochs): |
| print(f"\n📚 Epoch {epoch + 1}/{self.config.num_epochs}") |
| print("-" * 40) |
| |
| |
| indices = list(range(len(self.dataset))) |
| random.shuffle(indices) |
| |
| epoch_loss = 0 |
| epoch_box_loss = 0 |
| epoch_cls_loss = 0 |
| num_batches = 0 |
| |
| for i, idx in enumerate(indices): |
| sample = self.dataset[idx] |
| |
| loss, metrics = self.train_step(sample) |
| |
| if loss == 0: |
| continue |
| |
| epoch_loss += loss |
| epoch_box_loss += metrics.get('box_loss', 0) |
| epoch_cls_loss += metrics.get('cls_loss', 0) |
| num_batches += 1 |
| global_step += 1 |
| |
| |
| if global_step % self.config.log_every == 0: |
| elapsed = time.time() - start_time |
| avg_loss = epoch_loss / num_batches |
| print(f" Step {global_step:5d} | Loss: {loss:.4f} | " |
| f"Avg: {avg_loss:.4f} | Box: {metrics.get('box_loss', 0):.4f} | " |
| f"Cls: {metrics.get('cls_loss', 0):.4f} | {elapsed:.0f}s") |
| |
| |
| if global_step % self.config.save_every == 0: |
| self.save_checkpoint(global_step, loss) |
| if loss < best_loss: |
| best_loss = loss |
| |
| avg_epoch_loss = epoch_loss / max(num_batches, 1) |
| print(f"\n ✓ Epoch {epoch + 1} | Avg loss: {avg_epoch_loss:.4f} | " |
| f"Box: {epoch_box_loss/max(num_batches,1):.4f} | " |
| f"Cls: {epoch_cls_loss/max(num_batches,1):.4f}") |
| |
| |
| print("\n" + "=" * 60) |
| print("💾 Saving Final Model") |
| print("=" * 60) |
| |
| final_path = self.checkpoint_dir / "final" |
| final_path.mkdir(exist_ok=True) |
| |
| |
| torch.save({ |
| 'detection': self.model.detection_head.state_dict(), |
| 'point': self.model.point_head.state_dict(), |
| 'adapter': self.model.vision_adapter.state_dict() if self.model.vision_adapter else None, |
| }, final_path / "heads.pth") |
| |
| |
| import shutil |
| src_projector = OCULUS_ROOT / self.config.checkpoint_path / "projector.npz" |
| src_config = OCULUS_ROOT / self.config.checkpoint_path / "config.json" |
| if src_projector.exists(): |
| shutil.copy(src_projector, final_path / "projector.npz") |
| if src_config.exists(): |
| shutil.copy(src_config, final_path / "config.json") |
| |
| print(f"✅ Training complete! Model: {final_path}") |
| return final_path |
|
|
|
|
| def main(): |
| config = DetectionTrainingConfig( |
| data_dir="data/coco", |
| max_samples=2000, |
| num_epochs=2, |
| learning_rate=5e-4, |
| save_every=200, |
| log_every=25, |
| ) |
| |
| trainer = DetectionTrainer(config) |
| trainer.train() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|