""" ADE20K Semantic Segmentation Evaluation 评估超分图片在语义分割任务上的表现 Metrics: - mIoU (mean Intersection over Union) - Pixel Accuracy - Mean Accuracy """ import os import json from pathlib import Path from typing import Dict, List, Tuple import torch import torch.nn.functional as F from torch.utils.data import DataLoader, Dataset from torchvision.transforms import functional as TF from PIL import Image import numpy as np from tqdm import tqdm # ============================================================================ # 配置参数 - 请修改这里 # ============================================================================ CONFIG = { # SR images 目录 (你的超分模型输出,512x512) 'sr_dir': '/home/wanghongbo06/baipurui/results/ADE20K/DreamClear/results/output', # GT annotations 目录 (prepare_ade_patches.py 生成的) 'gt_ann_dir': '/home/wanghongbo06/baipurui/DATA/ADE20K_patch/gt_ann', # 推理配置 'device': 'cuda', 'batch_size': 4, 'num_workers': 4, # 是否打印每类 IoU 'print_per_class': False, # 输出 'output': './results/ade_eval_results.json', } # ============================================================================ # ADE20K 150 类别名称 ADE20K_CLASSES = [ 'wall', 'building', 'sky', 'floor', 'tree', 'ceiling', 'road', 'bed', 'windowpane', 'grass', 'cabinet', 'sidewalk', 'person', 'earth', 'door', 'table', 'mountain', 'plant', 'curtain', 'chair', 'car', 'water', 'painting', 'sofa', 'shelf', 'house', 'sea', 'mirror', 'rug', 'field', 'armchair', 'seat', 'fence', 'desk', 'rock', 'wardrobe', 'lamp', 'bathtub', 'railing', 'cushion', 'base', 'box', 'column', 'signboard', 'chest of drawers', 'counter', 'sand', 'sink', 'skyscraper', 'fireplace', 'refrigerator', 'grandstand', 'path', 'stairs', 'runway', 'case', 'pool table', 'pillow', 'screen door', 'stairway', 'river', 'bridge', 'bookcase', 'blind', 'coffee table', 'toilet', 'flower', 'book', 'hill', 'bench', 'countertop', 'stove', 'palm', 'kitchen island', 'computer', 'swivel chair', 'boat', 'bar', 'arcade machine', 'hovel', 'bus', 'towel', 'light', 'truck', 'tower', 'chandelier', 'awning', 'streetlight', 'booth', 'television receiver', 'airplane', 'dirt track', 'apparel', 'pole', 'land', 'bannister', 'escalator', 'ottoman', 'bottle', 'buffet', 'poster', 'stage', 'van', 'ship', 'fountain', 'conveyer belt', 'canopy', 'washer', 'plaything', 'swimming pool', 'stool', 'barrel', 'basket', 'waterfall', 'tent', 'bag', 'minibike', 'cradle', 'oven', 'ball', 'food', 'step', 'tank', 'trade name', 'microwave', 'pot', 'animal', 'bicycle', 'lake', 'dishwasher', 'screen', 'blanket', 'sculpture', 'hood', 'sconce', 'vase', 'traffic light', 'tray', 'ashcan', 'fan', 'pier', 'crt screen', 'plate', 'monitor', 'bulletin board', 'shower', 'radiator', 'glass', 'clock', 'flag' ] NUM_CLASSES = 150 class ADE20KDataset(Dataset): """ADE20K 数据集""" def __init__(self, sr_dir: str, gt_ann_dir: str): self.sr_dir = Path(sr_dir) self.gt_ann_dir = Path(gt_ann_dir) # 从 GT annotation 目录获取文件列表 self.files = sorted([f.stem for f in self.gt_ann_dir.glob('*.png')]) # 过滤出存在的 SR 图片 valid_files = [] for name in self.files: sr_path = self.sr_dir / f"{name}.png" if not sr_path.exists(): sr_path = self.sr_dir / f"{name}.jpg" if sr_path.exists(): valid_files.append(name) missing = len(self.files) - len(valid_files) if missing > 0: print(f"Warning: {missing} SR images not found, skipped.") self.files = valid_files print(f"Found {len(self.files)} valid image pairs") def __len__(self): return len(self.files) def __getitem__(self, idx: int) -> Dict: name = self.files[idx] # SR 图片 sr_path = self.sr_dir / f"{name}.png" if not sr_path.exists(): sr_path = self.sr_dir / f"{name}.jpg" sr_img = Image.open(sr_path).convert('RGB') sr_tensor = TF.to_tensor(sr_img) # GT 标注 gt_path = self.gt_ann_dir / f"{name}.png" gt = Image.open(gt_path) gt = np.array(gt, dtype=np.int64) return { 'image': sr_tensor, 'gt': gt, 'name': name, 'size': (sr_img.height, sr_img.width), } def collate_fn(batch): return batch class SegmentationModel: """SegFormer 语义分割模型""" def __init__(self, device: str = 'cuda'): self.device = device try: from transformers import SegformerForSemanticSegmentation, SegformerImageProcessor print("Loading SegFormer-B5 pretrained on ADE20K...") model_id = "nvidia/segformer-b5-finetuned-ade-640-640" self.processor = SegformerImageProcessor.from_pretrained(model_id) self.model = SegformerForSemanticSegmentation.from_pretrained(model_id) self.model.eval() self.model.to(device) except ImportError: raise ImportError( "Please install transformers: pip install transformers" ) @torch.no_grad() def predict(self, images: List[torch.Tensor], sizes: List[Tuple[int, int]]) -> List[np.ndarray]: """ 运行语义分割推理 Args: images: List of image tensors [C, H, W] sizes: List of original sizes (H, W) Returns: List of prediction arrays [H, W] with class indices (0-149) """ predictions = [] for image, size in zip(images, sizes): # 转换为 PIL Image image_pil = TF.to_pil_image(image) # 预处理 inputs = self.processor(images=image_pil, return_tensors="pt") inputs = {k: v.to(self.device) for k, v in inputs.items()} # 推理 outputs = self.model(**inputs) logits = outputs.logits # [1, num_classes, H, W] # 上采样到原始尺寸 logits = F.interpolate( logits, size=size, mode='bilinear', align_corners=False ) # 获取预测类别 (0-149) pred = logits.argmax(dim=1).squeeze(0).cpu().numpy() predictions.append(pred) return predictions class mIoUMetric: """mIoU 计算器""" def __init__(self, num_classes: int = 150, ignore_index: int = 0): """ Args: num_classes: 类别数 (ADE20K 为 150) ignore_index: 忽略的索引 (ADE20K 中 0 为 unlabeled) """ self.num_classes = num_classes self.ignore_index = ignore_index self.confusion_matrix = np.zeros((num_classes, num_classes), dtype=np.int64) def reset(self): self.confusion_matrix = np.zeros((self.num_classes, self.num_classes), dtype=np.int64) def update(self, pred: np.ndarray, gt: np.ndarray): """ 更新混淆矩阵 Args: pred: 预测数组 [H, W],值为 0-149 gt: GT 数组 [H, W],值为 0-150 (0=unlabeled, 1-150=classes) """ # ADE20K GT: 0=unlabeled, 1-150=classes # SegFormer pred: 0-149=classes # 需要将 GT 从 1-150 映射到 0-149,忽略 0 # 创建 mask:忽略 unlabeled 区域 valid_mask = gt != self.ignore_index # GT 转换: 1-150 -> 0-149 gt_mapped = gt - 1 # 只统计 valid 区域 pred_valid = pred[valid_mask] gt_valid = gt_mapped[valid_mask] # 过滤无效值 valid_indices = (gt_valid >= 0) & (gt_valid < self.num_classes) & \ (pred_valid >= 0) & (pred_valid < self.num_classes) pred_valid = pred_valid[valid_indices] gt_valid = gt_valid[valid_indices] # 更新混淆矩阵 indices = self.num_classes * gt_valid + pred_valid confusion = np.bincount(indices, minlength=self.num_classes**2) self.confusion_matrix += confusion.reshape(self.num_classes, self.num_classes) def compute(self) -> Dict: """计算 mIoU 和其他指标""" # 每个类别的 IoU intersection = np.diag(self.confusion_matrix) union = (self.confusion_matrix.sum(axis=1) + self.confusion_matrix.sum(axis=0) - intersection) # 避免除零 union = np.maximum(union, 1) iou_per_class = intersection / union # 只计算出现过的类别的 mIoU valid_classes = self.confusion_matrix.sum(axis=1) > 0 # mIoU miou = np.mean(iou_per_class[valid_classes]) # Pixel Accuracy pixel_acc = np.diag(self.confusion_matrix).sum() / max(self.confusion_matrix.sum(), 1) # Mean Accuracy (每类准确率的平均) class_acc = intersection / np.maximum(self.confusion_matrix.sum(axis=1), 1) mean_acc = np.mean(class_acc[valid_classes]) return { 'mIoU': miou, 'pixel_acc': pixel_acc, 'mean_acc': mean_acc, 'iou_per_class': iou_per_class, 'valid_classes': valid_classes, } def main(): device = CONFIG['device'] if device == 'cuda' and not torch.cuda.is_available(): print("CUDA not available, using CPU") device = 'cpu' # 自动生成 output 路径:根据 sr_dir 最后一个目录名 sr_dir = Path(CONFIG['sr_dir']) baseline_name = sr_dir.name # 获取最后一个目录名,如 'sr', 'gt', 'bicubic' 等 output_path = Path(f"./ade_eval_results_{baseline_name}.json") print(f"Loading SR images from: {CONFIG['sr_dir']}") print(f"Loading GT annotations from: {CONFIG['gt_ann_dir']}") print(f"Output will be saved to: {output_path}") # 创建数据集 dataset = ADE20KDataset( sr_dir=CONFIG['sr_dir'], gt_ann_dir=CONFIG['gt_ann_dir'], ) if len(dataset) == 0: print("Error: No valid image pairs found!") return dataloader = DataLoader( dataset, batch_size=CONFIG['batch_size'], shuffle=False, num_workers=CONFIG['num_workers'], collate_fn=collate_fn, ) # 加载模型 model = SegmentationModel(device=device) # 评估 metric = mIoUMetric(num_classes=NUM_CLASSES, ignore_index=0) print("\nRunning inference...") for batch in tqdm(dataloader): images = [item['image'] for item in batch] gts = [item['gt'] for item in batch] sizes = [item['size'] for item in batch] # 推理 predictions = model.predict(images, sizes) # 更新指标 for pred, gt in zip(predictions, gts): metric.update(pred, gt) # 计算最终指标 results = metric.compute() # 打印结果 print("\n" + "="*60) print("EVALUATION SUMMARY") print("="*60) print(f"\n mIoU : {results['mIoU']*100:.2f}") print(f" Pixel Accuracy : {results['pixel_acc']*100:.2f}") print(f" Mean Accuracy : {results['mean_acc']*100:.2f}") # 打印每类 IoU if CONFIG['print_per_class']: print("\n" + "-"*60) print("Per-class IoU:") print("-"*60) iou_per_class = results['iou_per_class'] valid_classes = results['valid_classes'] for i, (class_name, iou, valid) in enumerate(zip( ADE20K_CLASSES, iou_per_class, valid_classes )): if valid: print(f" {i:3d}. {class_name:25s}: {iou*100:6.2f}") # 保存结果 output_path.parent.mkdir(parents=True, exist_ok=True) save_results = { 'mIoU': float(results['mIoU']), 'pixel_acc': float(results['pixel_acc']), 'mean_acc': float(results['mean_acc']), } with open(output_path, 'w') as f: json.dump(save_results, f, indent=2) print(f"\nResults saved to {output_path}") if __name__ == '__main__': main()