from fastapi import APIRouter print(1) from datetime import datetime print(2) from datasets import load_dataset print(3) import numpy as np print(4) from sklearn.metrics import accuracy_score, precision_score, recall_score print(5) import random print(6) import os print(7) from ultralytics import YOLO print(8) from .utils.evaluation import ImageEvaluationRequest print(9) from .utils.emissions import tracker, clean_emissions_data, get_space_info print(10) from dotenv import load_dotenv print(11) load_dotenv() print(12) router = APIRouter() print(13) import torch # Get CUDA version (the one PyTorch was compiled with) print("CUDA version:", torch.version.cuda) # Get cuDNN version print("cuDNN version:", torch.backends.cudnn.version()) #MODEL_TYPE = "YOLOv11n" DESCRIPTION = f"YOLOv11n 1280 not quantisized model with batch 1 inference on TensorRT" print(14) ROUTE = "/image" print(15) def parse_boxes(annotation_string): """Parse multiple boxes from a single annotation string. Each box has 5 values: class_id, x_center, y_center, width, height""" values = [float(x) for x in annotation_string.strip().split()] boxes = [] # Each box has 5 values for i in range(0, len(values), 5): if i + 5 <= len(values): # Skip class_id (first value) and take the next 4 values box = values[i+1:i+5] boxes.append(box) return boxes print(16) def compute_iou(box1, box2): """Compute Intersection over Union (IoU) between two YOLO format boxes.""" # Convert YOLO format (x_center, y_center, width, height) to corners def yolo_to_corners(box): x_center, y_center, width, height = box x1 = x_center - width/2 y1 = y_center - height/2 x2 = x_center + width/2 y2 = y_center + height/2 return np.array([x1, y1, x2, y2]) box1_corners = yolo_to_corners(box1) box2_corners = yolo_to_corners(box2) # Calculate intersection x1 = max(box1_corners[0], box2_corners[0]) y1 = max(box1_corners[1], box2_corners[1]) x2 = min(box1_corners[2], box2_corners[2]) y2 = min(box1_corners[3], box2_corners[3]) intersection = max(0, x2 - x1) * max(0, y2 - y1) # Calculate union box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) union = box1_area + box2_area - intersection return intersection / (union + 1e-6) print(17) def compute_max_iou(true_boxes, pred_box): """Compute maximum IoU between a predicted box and all true boxes""" max_iou = 0 for true_box in true_boxes: iou = compute_iou(true_box, pred_box) max_iou = max(max_iou, iou) return max_iou print(18) def load_model(path_to_model, model_type="YOLO"): if model_type == "YOLO": model = YOLO(path_to_model) else: raise NotImplementedError return model print(19) def get_boxes_list(predictions): return [box.tolist() for box in predictions.boxes.xywhn] print(20) @router.post(ROUTE, tags=["Image Task"], description=DESCRIPTION) async def evaluate_image(request: ImageEvaluationRequest): """ Evaluate image classification and object detection for forest fire smoke. Current Model: Random Baseline - Makes random predictions for both classification and bounding boxes - Used as a baseline for comparison Metrics: - Classification accuracy: Whether an image contains smoke or not - Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes """ # Get space info username, space_url = get_space_info() # Load and prepare the dataset dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) # Split dataset test_dataset = dataset["test"] # Start tracking emissions tracker.start() tracker.start_task("inference") #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE CODE HERE # Update the code below to replace the random baseline with your model inference #-------------------------------------------------------------------------------------------- import cv2 import onnxruntime import matplotlib.pyplot as plt #PATH_TO_MODEL = 'models/best_YOLOv11n_1280.onnx' #PATH_TO_MODEL = 'models/best_yolov6n_1280.pt' #PATH_TO_MODEL = 'models/best_YOLOv11n_1280_real_half.onnx' PATH_TO_MODEL = 'models/best_YOLOv11n_1280_half_batch_1.engine' INFERENCE_ENGINE_TYPE = 'pt' INPUT_SIZE = 1280 N_TEST_BATCHES = 2 BATCH_SIZE = 1 # Can be adjusted as needed print("PATH_TO_MODEL", PATH_TO_MODEL) def preprocessor(frame): #frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Only when read from file x = cv2.resize(frame, (INPUT_SIZE, INPUT_SIZE)) image_data = np.array(x).astype(np.float32) / 255.0 # Normalize to [0, 1] range image_data = np.transpose(image_data, (2, 0, 1)) # (H, W, C) -> (C, H, W) image_data = np.expand_dims(image_data, axis=0) # Add batch dimension return image_data class Inference: def __init__(self, model, image): self.session = onnxruntime.InferenceSession(model, providers=["CPUExecutionProvider"] #providers=["CUDAExecutionProvider"] ) model_inputs = self.session.get_inputs() input_shape = model_inputs[0].shape self.image = image self.input_width = input_shape[2] self.input_height = input_shape[3] self.classes = {0: 'smoke'} def detector(self, image_data): ort = onnxruntime.OrtValue.ortvalue_from_numpy(image_data) return self.session.run(["output0"], {"images": ort}) def postprocessor(self, results, frame, confidence, iou): img_height, img_width = frame.shape[:2] outputs = np.transpose(np.squeeze(results[0])) rows = outputs.shape[0] boxes = [] final_boxes = [] final_scores = [] scores = [] class_ids = [] x_factor = img_width / self.input_width y_factor = img_height / self.input_height max_of_max_scores = 0 for i in range(rows): classes_scores = outputs[i][4:] max_score = np.amax(classes_scores) if max_score >= confidence: class_id = np.argmax(classes_scores) x, y, w, h = outputs[i][0], outputs[i][1], outputs[i][2], outputs[i][3] # Calculate the scaled coordinates of the bounding box left = int(x * x_factor) / img_width top = int(y * y_factor) / img_height width = int(w * x_factor) / img_width height = int(h * y_factor) / img_height class_ids.append(class_id) scores.append(max_score) boxes.append([left, top, width, height]) max_of_max_scores = max(max_of_max_scores, max_score) # Apply non-maximum suppression to filter out overlapping bounding boxes indices = cv2.dnn.NMSBoxes(boxes, scores, confidence, iou) for i in indices: box = boxes[i] score = scores[i] class_id = class_ids[i] final_boxes.append(box) final_scores.append(score) return frame, final_boxes, final_scores def pipeline(self): if isinstance(self.image, str): frame = cv2.imread(self.image) else: frame = np.array(self.image) preprocessed = preprocessor(frame) detected = self.detector(preprocessed) frame, boxes, scores = self.postprocessor(detected, frame, 0.20,0.20) return frame, boxes, scores def predict(inference_engine_type, image, path_to_model=None, model=None): if inference_engine_type == 'pt': print("INFO - Using pytorch model") assert model is not None res = model.predict(image, imgsz=INPUT_SIZE)[0] boxes = res.boxes.xywhn.tolist() confidences = res.boxes.conf.tolist() return boxes, confidences elif inference_engine_type == 'onnx': assert path_to_model is not None print("INFO -Using onnx model") inference_engine = Inference(path_to_model, image) _, boxes, scores = inference_engine.pipeline() return boxes, scores else: raise ValueError(f"Invalid inference engine type: {inference_engine_type}") print("Starting inference") predictions = [] true_labels = [] pred_boxes = [] true_boxes_list = [] # List of lists, each inner list contains boxes for one image n_examples = len(test_dataset) n_boxes = [] model = YOLO(PATH_TO_MODEL) print("PATH_TO_MODEL", PATH_TO_MODEL) # First pass - process annotations has_smoke_list = [] annotations_list = [] for i, example in enumerate(test_dataset): if i % 200 == 0: print(f"Processing annotations {i+1} of {n_examples}") annotation = example.get("annotations", "").strip() n_annotations = len(annotation.split("\n")) n_boxes.append(n_annotations) has_smoke = len(annotation) > 0 has_smoke_list.append(has_smoke) true_labels.append(int(has_smoke)) annotations_list.append(annotation) if i == (N_TEST_BATCHES+1)*BATCH_SIZE-1: #break pass all_preds = [] all_scores = [] all_binary_classifications = [] # Second pass - batch predictions for i, batch_start in enumerate(range(0, n_examples, BATCH_SIZE)): batch_end = min(batch_start + BATCH_SIZE, n_examples) if i % 100 == 0: print(f"Processing batch {batch_start//BATCH_SIZE + 1} of {(n_examples + BATCH_SIZE - 1)//BATCH_SIZE}") print(f"Batch start: {batch_start}, Batch end: {batch_end}") # Get batch of images and pad if needed batch_images = [] for j in range(batch_start, batch_end): batch_images.append(test_dataset[j]['image']) # Pad the last batch if needed if len(batch_images) < BATCH_SIZE: print(f"Padding last batch from {len(batch_images)} to {BATCH_SIZE} images") padding_needed = BATCH_SIZE - len(batch_images) # Duplicate the last image to fill the batch batch_images.extend([batch_images[-1]] * padding_needed) print("Running predictions") # Get predictions for batch results = model.predict(batch_images, imgsz=INPUT_SIZE) # Only process the actual examples (not padding) actual_results = results[:batch_end-batch_start] batch_preds = [x.boxes.xywhn.tolist()[0] if len(x.boxes.xywhn.tolist()) > 0 else [] for x in actual_results] # Only the first box for simplicity batch_scores = [x.boxes.conf.tolist()[0] if len(x.boxes.conf.tolist()) > 0 else [] for x in actual_results] batch_binary_classifications = [int(len(x.boxes.xywhn.tolist()) > 0) for x in actual_results] all_preds += batch_preds all_scores += batch_scores all_binary_classifications += batch_binary_classifications print("Processing predictions") if i == N_TEST_BATCHES: from collections import Counter n_box_distr = Counter(n_boxes) print(n_box_distr) #break pass pred_boxes = [] for idx in range(len(all_preds)): if has_smoke_list[idx]: # Parse true boxes image_true_boxes = parse_boxes(annotations_list[idx]) true_boxes_list.append(image_true_boxes) # Process predicted boxes try: if len(all_preds[idx]) < 1: model_preds = [0, 0, 0, 0] else: model_preds = all_preds[idx] except: model_preds = [0, 0, 0, 0] pred_boxes.append(model_preds) print("Processing completed with last index", idx) #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE STOPS HERE #-------------------------------------------------------------------------------------------- # Stop tracking emissions emissions_data = tracker.stop_task() predictions = all_binary_classifications # Calculate classification metrics classification_accuracy = accuracy_score(true_labels, predictions) classification_precision = precision_score(true_labels, predictions) classification_recall = recall_score(true_labels, predictions) # Calculate mean IoU for object detection (only for images with smoke) # For each image, we compute the max IoU between the predicted box and all true boxes print("Calculating mean IoU") ious = [] for true_boxes, pred_box in zip(true_boxes_list, pred_boxes): max_iou = compute_max_iou(true_boxes, pred_box) ious.append(max_iou) mean_iou = float(np.mean(ious)) if ious else 0.0 print("Mean IoU calculated") # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "classification_accuracy": float(classification_accuracy), "classification_precision": float(classification_precision), "classification_recall": float(classification_recall), "mean_iou": mean_iou, "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } print("Result returned") return results