Spaces:
Sleeping
Sleeping
| import torch | |
| import numpy as np | |
| import cv2 | |
| import torchvision | |
| import argparse | |
| import random | |
| import os | |
| import yaml | |
| from tqdm import tqdm | |
| from dataset.st import SceneTextDataset | |
| from torch.utils.data.dataloader import DataLoader | |
| import detection | |
| from detection.faster_rcnn import FastRCNNPredictor | |
| from shapely.geometry import Polygon | |
| from detection.anchor_utils import AnchorGenerator | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| def get_iou(det, gt): | |
| det_x, det_y, det_w, det_h, det_theta = det | |
| gt_x, gt_y, gt_w, gt_h, gt_theta = gt | |
| def get_rotated_box(x, y, w, h, theta): | |
| cos_t, sin_t = np.cos(theta), np.sin(theta) | |
| dx, dy = w / 2, h / 2 | |
| corners = np.array([ | |
| [-dx, -dy], [dx, -dy], [dx, dy], [-dx, dy] | |
| ]) | |
| rotation_matrix = np.array([[cos_t, -sin_t], [sin_t, cos_t]]) | |
| rotated_corners = np.dot(corners, rotation_matrix.T) + np.array([x, y]) | |
| return Polygon(rotated_corners) | |
| det_poly = get_rotated_box(det_x, det_y, det_w, det_h, det_theta) | |
| gt_poly = get_rotated_box(gt_x, gt_y, gt_w, gt_h, gt_theta) | |
| if not det_poly.intersects(gt_poly): | |
| return 0.0 | |
| intersection_area = det_poly.intersection(gt_poly).area | |
| union_area = det_poly.area + gt_poly.area - intersection_area + 1E-6 | |
| return intersection_area / union_area | |
| def compute_map(det_boxes, gt_boxes, iou_threshold=0.5, method="area", return_pr=False): | |
| gt_labels = {cls_key for im_gt in gt_boxes for cls_key in im_gt.keys()} | |
| gt_labels = sorted(gt_labels) | |
| all_aps = {} | |
| all_precisions = {} | |
| all_recalls = {} | |
| aps = [] | |
| for idx, label in enumerate(gt_labels): | |
| # Get detection predictions of this class | |
| cls_dets = [ | |
| [im_idx, im_dets_label] | |
| for im_idx, im_dets in enumerate(det_boxes) | |
| if label in im_dets | |
| for im_dets_label in im_dets[label] | |
| ] | |
| # Sort by confidence score (descending) | |
| cls_dets = sorted(cls_dets, key=lambda k: -k[1][-1]) | |
| # Track matched GT boxes | |
| gt_matched = [[False for _ in im_gts[label]] for im_gts in gt_boxes] | |
| num_gts = sum([len(im_gts[label]) for im_gts in gt_boxes]) | |
| tp = np.zeros(len(cls_dets)) | |
| fp = np.zeros(len(cls_dets)) | |
| # Process each detection | |
| for det_idx, (im_idx, det_pred) in enumerate(cls_dets): | |
| im_gts = gt_boxes[im_idx][label] | |
| max_iou_found = -1 | |
| max_iou_gt_idx = -1 | |
| # Find the best-matching GT box | |
| for gt_box_idx, gt_box in enumerate(im_gts): | |
| gt_box_iou = get_iou(det_pred[:-1], gt_box) | |
| if gt_box_iou > max_iou_found: | |
| max_iou_found = gt_box_iou | |
| max_iou_gt_idx = gt_box_idx | |
| # True Positive if IoU >= threshold & GT box is not already matched | |
| if max_iou_found < iou_threshold or gt_matched[im_idx][max_iou_gt_idx]: | |
| fp[det_idx] = 1 | |
| else: | |
| tp[det_idx] = 1 | |
| gt_matched[im_idx][max_iou_gt_idx] = True | |
| # Compute cumulative sums for TP and FP | |
| tp = np.cumsum(tp) | |
| fp = np.cumsum(fp) | |
| eps = np.finfo(np.float32).eps | |
| recalls = tp / np.maximum(num_gts, eps) | |
| precisions = tp / np.maximum(tp + fp, eps) | |
| # Compute AP | |
| if method == "area": | |
| recalls = np.concatenate(([0.0], recalls, [1.0])) | |
| precisions = np.concatenate(([0.0], precisions, [0.0])) | |
| for i in range(len(precisions) - 1, 0, -1): | |
| precisions[i - 1] = np.maximum(precisions[i - 1], precisions[i]) | |
| i = np.where(recalls[1:] != recalls[:-1])[0] | |
| ap = np.sum((recalls[i + 1] - recalls[i]) * precisions[i + 1]) | |
| elif method == "interp": | |
| ap = ( | |
| sum( | |
| [ | |
| max(precisions[recalls >= t]) if any(recalls >= t) else 0 | |
| for t in np.arange(0, 1.1, 0.1) | |
| ] | |
| ) | |
| / 11.0 | |
| ) | |
| else: | |
| raise ValueError("Method must be 'area' or 'interp'") | |
| if num_gts > 0: | |
| aps.append(ap) | |
| all_aps[label] = ap | |
| all_precisions[label] = precisions.tolist() | |
| all_recalls[label] = recalls.tolist() | |
| else: | |
| all_aps[label] = np.nan | |
| all_precisions[label] = [] | |
| all_recalls[label] = [] | |
| mean_ap = sum(aps) / len(aps) if aps else 0.0 | |
| if return_pr: | |
| return mean_ap, all_aps, all_precisions, all_recalls | |
| else: | |
| return mean_ap, all_aps | |
| def load_model_and_dataset(args): | |
| # Read the config file # | |
| with open(args.config_path, "r") as file: | |
| try: | |
| config = yaml.safe_load(file) | |
| except yaml.YAMLError as exc: | |
| print(exc) | |
| print(config) | |
| ######################## | |
| dataset_config = config["dataset_params"] | |
| model_config = config["model_params"] | |
| train_config = config["train_params"] | |
| seed = train_config["seed"] | |
| torch.manual_seed(seed) | |
| np.random.seed(seed) | |
| random.seed(seed) | |
| if device == "cuda": | |
| torch.cuda.manual_seed_all(seed) | |
| st = SceneTextDataset(args.split_type, root_dir=dataset_config["root_dir"]) | |
| test_dataset = DataLoader(st, batch_size=1, shuffle=False) | |
| faster_rcnn_model = detection.fasterrcnn_resnet50_fpn( | |
| pretrained=True, | |
| min_size=600, | |
| max_size=1000, | |
| box_score_thresh=0.7, | |
| ) | |
| faster_rcnn_model.roi_heads.box_predictor = FastRCNNPredictor( | |
| faster_rcnn_model.roi_heads.box_predictor.cls_score.in_features, | |
| num_classes=dataset_config["num_classes"], | |
| num_theta_bins=args.num_theta_bins, | |
| ) | |
| faster_rcnn_model.eval() | |
| faster_rcnn_model.to(device) | |
| faster_rcnn_model.load_state_dict( | |
| torch.load( | |
| os.path.join( | |
| train_config["task_name"], | |
| "tv_frcnn_r50fpn_" + train_config["ckpt_name"], | |
| ), | |
| map_location='cpu', | |
| ) | |
| ) | |
| return faster_rcnn_model, st, test_dataset | |
| def evaluate_metrics(args): | |
| faster_rcnn_model, voc, test_dataset = load_model_and_dataset(args) | |
| gts = [] | |
| preds = [] | |
| for im, target, fname in tqdm(test_dataset): | |
| im_name = fname | |
| im = im.float().to(device) | |
| target_boxes = target["bboxes"].float().to(device)[0] | |
| target_labels = target["labels"].long().to(device)[0] | |
| target_thetas = target["thetas"].float().to(device)[0] | |
| frcnn_output = faster_rcnn_model(im, None)[0] | |
| boxes = frcnn_output["boxes"] | |
| labels = frcnn_output["labels"] | |
| scores = frcnn_output["scores"] | |
| thetas = frcnn_output["thetas"] | |
| pred_boxes = {label_name: [] for label_name in voc.label2idx} | |
| gt_boxes = {label_name: [] for label_name in voc.label2idx} | |
| for idx, box in enumerate(boxes): | |
| x1, y1, x2, y2 = box.detach().cpu().numpy() | |
| label = labels[idx].detach().cpu().item() | |
| score = scores[idx].detach().cpu().item() | |
| theta = thetas[idx].detach().cpu().item() | |
| label_name = voc.idx2label[label] | |
| pred_boxes[label_name].append([x1, y1, x2, y2, theta, score]) | |
| for idx, box in enumerate(target_boxes): | |
| x1, y1, x2, y2 = box.detach().cpu().numpy() | |
| label = target_labels[idx].detach().cpu().item() | |
| label_name = voc.idx2label[label] | |
| theta = target_thetas[idx].detach().cpu().item() | |
| gt_boxes[label_name].append([x1, y1, x2, y2, theta]) | |
| gts.append(gt_boxes) | |
| preds.append(pred_boxes) | |
| # Compute Mean Average Precision and Precision-Recall values | |
| mean_ap, all_aps, precisions, recalls = compute_map( | |
| preds, gts, method="interp", return_pr=True | |
| ) | |
| mean_precision = 0 | |
| mean_recall = 0 | |
| num_classes = len(voc.idx2label) | |
| for idx in range(num_classes): | |
| class_name = voc.idx2label[idx] | |
| ap = all_aps[class_name] | |
| prec = precisions[class_name] | |
| rec = recalls[class_name] | |
| mean_precision += sum(prec) / len(prec) if len(prec) > 0 else 0 | |
| mean_recall += sum(rec) / len(rec) if len(rec) > 0 else 0 | |
| print(f"Class: {class_name}") | |
| print( | |
| f" AP: {ap:.4f}, Precision: {sum(prec) / len(prec) if len(prec) > 0 else 0:.4f}, Recall: {sum(rec) / len(rec) if len(rec) > 0 else 0:.4f}" | |
| ) | |
| mean_precision /= num_classes | |
| mean_recall /= num_classes | |
| print(f"Mean Average Precision (mAP): {mean_ap:.4f}") | |
| print(f"Mean Precision: {mean_precision:.4f}") | |
| print(f"Mean Recall: {mean_recall:.4f}") | |
| return mean_ap, mean_precision, mean_recall | |
| def infer(args): | |
| output_dir = "samples_tv_r50fpn" | |
| if not os.path.exists(output_dir): | |
| os.mkdir(output_dir) | |
| faster_rcnn_model, voc, test_dataset = load_model_and_dataset(args) | |
| for sample_count in tqdm(range(10)): | |
| random_idx = random.randint(0, len(voc)) | |
| im, target, fname = voc[random_idx] | |
| im = im.unsqueeze(0).float().to(device) | |
| gt_im = cv2.imread(fname) | |
| gt_im_copy = gt_im.copy() | |
| # Saving images with ground truth boxes | |
| for idx, box in enumerate(target["bboxes"]): | |
| x1, y1, x2, y2 = box.detach().cpu().numpy() | |
| x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) | |
| theta = target["thetas"][idx].detach().cpu().numpy() * 180 / np.pi | |
| cx, cy, w, h = (x1 + x2) / 2, (y1 + y2) / 2, x2 - x1, y2 - y1 | |
| box = cv2.boxPoints(((cx, cy), (w, h), theta)) | |
| box = box.astype(np.int32) | |
| cv2.drawContours(gt_im, [box], 0, (0, 255, 0), 2) | |
| cv2.drawContours(gt_im_copy, [box], 0, (0, 255, 0), 2) | |
| cv2.addWeighted(gt_im_copy, 0.7, gt_im, 0.3, 0, gt_im) | |
| cv2.imwrite("{}/output_frcnn_gt_{}.png".format(output_dir, sample_count), gt_im) | |
| # Getting predictions from trained model | |
| frcnn_output = faster_rcnn_model(im, None)[0] | |
| boxes = frcnn_output["boxes"] | |
| labels = frcnn_output["labels"] | |
| scores = frcnn_output["scores"] | |
| thetas = frcnn_output["thetas"] | |
| im = cv2.imread(fname) | |
| im_copy = im.copy() | |
| # Saving images with predicted boxes | |
| for idx, box in enumerate(boxes): | |
| x1, y1, x2, y2 = box.detach().cpu().numpy() | |
| x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) | |
| theta = thetas[idx].detach().cpu().numpy() * 180 / np.pi | |
| cx, cy, w, h = (x1 + x2) / 2, (y1 + y2) / 2, x2 - x1, y2 - y1 | |
| box = cv2.boxPoints(((cx, cy), (w, h), theta)) | |
| box = box.astype(np.int32) | |
| cv2.drawContours(im, [box], 0, (0, 255, 0), 2) | |
| cv2.drawContours(im_copy, [box], 0, (0, 255, 0), 2) | |
| cv2.addWeighted(im_copy, 0.7, im, 0.3, 0, im) | |
| cv2.imwrite("{}/output_frcnn_{}.jpg".format(output_dir, sample_count), im) | |
| if __name__ == "__main__": | |
| # print(torch) | |
| parser = argparse.ArgumentParser( | |
| description="Arguments for inference using torchvision code faster rcnn" | |
| ) | |
| parser.add_argument( | |
| "--config", dest="config_path", default="config/st.yaml", type=str | |
| ) | |
| parser.add_argument("--evaluate", dest="evaluate", default=False, type=bool) | |
| parser.add_argument( | |
| "--infer_samples", dest="infer_samples", default=True, type=bool | |
| ) | |
| args = parser.parse_args() | |
| args.split_type = "train" | |
| args.num_theta_bins = 359 | |
| if args.infer_samples: | |
| infer(args) | |
| else: | |
| print("Not Inferring for samples as `infer_samples` argument is False") | |
| if args.evaluate: | |
| evaluate_metrics(args) | |
| else: | |
| print("Not Evaluating as `evaluate` argument is False") | |