Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter | |
| print(1) | |
| from datetime import datetime | |
| print(2) | |
| from datasets import load_dataset | |
| print(3) | |
| import numpy as np | |
| print(4) | |
| from sklearn.metrics import accuracy_score, precision_score, recall_score | |
| print(5) | |
| import random | |
| print(6) | |
| import os | |
| print(7) | |
| from ultralytics import YOLO | |
| print(8) | |
| from .utils.evaluation import ImageEvaluationRequest | |
| print(9) | |
| from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
| print(10) | |
| from dotenv import load_dotenv | |
| print(11) | |
| load_dotenv() | |
| print(12) | |
| router = APIRouter() | |
| print(13) | |
| import torch | |
| # Get CUDA version (the one PyTorch was compiled with) | |
| print("CUDA version:", torch.version.cuda) | |
| # Get cuDNN version | |
| print("cuDNN version:", torch.backends.cudnn.version()) | |
| #MODEL_TYPE = "YOLOv11n" | |
| DESCRIPTION = f"YOLOv11n 1280 not quantisized model with batch 1 inference on TensorRT" | |
| print(14) | |
| ROUTE = "/image" | |
| print(15) | |
| def parse_boxes(annotation_string): | |
| """Parse multiple boxes from a single annotation string. | |
| Each box has 5 values: class_id, x_center, y_center, width, height""" | |
| values = [float(x) for x in annotation_string.strip().split()] | |
| boxes = [] | |
| # Each box has 5 values | |
| for i in range(0, len(values), 5): | |
| if i + 5 <= len(values): | |
| # Skip class_id (first value) and take the next 4 values | |
| box = values[i+1:i+5] | |
| boxes.append(box) | |
| return boxes | |
| print(16) | |
| def compute_iou(box1, box2): | |
| """Compute Intersection over Union (IoU) between two YOLO format boxes.""" | |
| # Convert YOLO format (x_center, y_center, width, height) to corners | |
| def yolo_to_corners(box): | |
| x_center, y_center, width, height = box | |
| x1 = x_center - width/2 | |
| y1 = y_center - height/2 | |
| x2 = x_center + width/2 | |
| y2 = y_center + height/2 | |
| return np.array([x1, y1, x2, y2]) | |
| box1_corners = yolo_to_corners(box1) | |
| box2_corners = yolo_to_corners(box2) | |
| # Calculate intersection | |
| x1 = max(box1_corners[0], box2_corners[0]) | |
| y1 = max(box1_corners[1], box2_corners[1]) | |
| x2 = min(box1_corners[2], box2_corners[2]) | |
| y2 = min(box1_corners[3], box2_corners[3]) | |
| intersection = max(0, x2 - x1) * max(0, y2 - y1) | |
| # Calculate union | |
| box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1]) | |
| box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1]) | |
| union = box1_area + box2_area - intersection | |
| return intersection / (union + 1e-6) | |
| print(17) | |
| def compute_max_iou(true_boxes, pred_box): | |
| """Compute maximum IoU between a predicted box and all true boxes""" | |
| max_iou = 0 | |
| for true_box in true_boxes: | |
| iou = compute_iou(true_box, pred_box) | |
| max_iou = max(max_iou, iou) | |
| return max_iou | |
| print(18) | |
| def load_model(path_to_model, model_type="YOLO"): | |
| if model_type == "YOLO": | |
| model = YOLO(path_to_model) | |
| else: | |
| raise NotImplementedError | |
| return model | |
| print(19) | |
| def get_boxes_list(predictions): | |
| return [box.tolist() for box in predictions.boxes.xywhn] | |
| print(20) | |
| async def evaluate_image(request: ImageEvaluationRequest): | |
| """ | |
| Evaluate image classification and object detection for forest fire smoke. | |
| Current Model: Random Baseline | |
| - Makes random predictions for both classification and bounding boxes | |
| - Used as a baseline for comparison | |
| Metrics: | |
| - Classification accuracy: Whether an image contains smoke or not | |
| - Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes | |
| """ | |
| # Get space info | |
| username, space_url = get_space_info() | |
| # Load and prepare the dataset | |
| dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) | |
| # Split dataset | |
| test_dataset = dataset["test"] | |
| # Start tracking emissions | |
| tracker.start() | |
| tracker.start_task("inference") | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE CODE HERE | |
| # Update the code below to replace the random baseline with your model inference | |
| #-------------------------------------------------------------------------------------------- | |
| import cv2 | |
| import onnxruntime | |
| import matplotlib.pyplot as plt | |
| #PATH_TO_MODEL = 'models/best_YOLOv11n_1280.onnx' | |
| #PATH_TO_MODEL = 'models/best_yolov6n_1280.pt' | |
| #PATH_TO_MODEL = 'models/best_YOLOv11n_1280_real_half.onnx' | |
| PATH_TO_MODEL = 'models/best_YOLOv11n_1280_half_batch_1.engine' | |
| INFERENCE_ENGINE_TYPE = 'pt' | |
| INPUT_SIZE = 1280 | |
| N_TEST_BATCHES = 2 | |
| BATCH_SIZE = 1 # Can be adjusted as needed | |
| print("PATH_TO_MODEL", PATH_TO_MODEL) | |
| def preprocessor(frame): | |
| #frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Only when read from file | |
| x = cv2.resize(frame, (INPUT_SIZE, INPUT_SIZE)) | |
| image_data = np.array(x).astype(np.float32) / 255.0 # Normalize to [0, 1] range | |
| image_data = np.transpose(image_data, (2, 0, 1)) # (H, W, C) -> (C, H, W) | |
| image_data = np.expand_dims(image_data, axis=0) # Add batch dimension | |
| return image_data | |
| class Inference: | |
| def __init__(self, model, image): | |
| self.session = onnxruntime.InferenceSession(model, providers=["CPUExecutionProvider"] | |
| #providers=["CUDAExecutionProvider"] | |
| ) | |
| model_inputs = self.session.get_inputs() | |
| input_shape = model_inputs[0].shape | |
| self.image = image | |
| self.input_width = input_shape[2] | |
| self.input_height = input_shape[3] | |
| self.classes = {0: 'smoke'} | |
| def detector(self, image_data): | |
| ort = onnxruntime.OrtValue.ortvalue_from_numpy(image_data) | |
| return self.session.run(["output0"], {"images": ort}) | |
| def postprocessor(self, results, frame, confidence, iou): | |
| img_height, img_width = frame.shape[:2] | |
| outputs = np.transpose(np.squeeze(results[0])) | |
| rows = outputs.shape[0] | |
| boxes = [] | |
| final_boxes = [] | |
| final_scores = [] | |
| scores = [] | |
| class_ids = [] | |
| x_factor = img_width / self.input_width | |
| y_factor = img_height / self.input_height | |
| max_of_max_scores = 0 | |
| for i in range(rows): | |
| classes_scores = outputs[i][4:] | |
| max_score = np.amax(classes_scores) | |
| if max_score >= confidence: | |
| class_id = np.argmax(classes_scores) | |
| x, y, w, h = outputs[i][0], outputs[i][1], outputs[i][2], outputs[i][3] | |
| # Calculate the scaled coordinates of the bounding box | |
| left = int(x * x_factor) / img_width | |
| top = int(y * y_factor) / img_height | |
| width = int(w * x_factor) / img_width | |
| height = int(h * y_factor) / img_height | |
| class_ids.append(class_id) | |
| scores.append(max_score) | |
| boxes.append([left, top, width, height]) | |
| max_of_max_scores = max(max_of_max_scores, max_score) | |
| # Apply non-maximum suppression to filter out overlapping bounding boxes | |
| indices = cv2.dnn.NMSBoxes(boxes, scores, confidence, iou) | |
| for i in indices: | |
| box = boxes[i] | |
| score = scores[i] | |
| class_id = class_ids[i] | |
| final_boxes.append(box) | |
| final_scores.append(score) | |
| return frame, final_boxes, final_scores | |
| def pipeline(self): | |
| if isinstance(self.image, str): | |
| frame = cv2.imread(self.image) | |
| else: | |
| frame = np.array(self.image) | |
| preprocessed = preprocessor(frame) | |
| detected = self.detector(preprocessed) | |
| frame, boxes, scores = self.postprocessor(detected, frame, 0.20,0.20) | |
| return frame, boxes, scores | |
| def predict(inference_engine_type, image, path_to_model=None, model=None): | |
| if inference_engine_type == 'pt': | |
| print("INFO - Using pytorch model") | |
| assert model is not None | |
| res = model.predict(image, imgsz=INPUT_SIZE)[0] | |
| boxes = res.boxes.xywhn.tolist() | |
| confidences = res.boxes.conf.tolist() | |
| return boxes, confidences | |
| elif inference_engine_type == 'onnx': | |
| assert path_to_model is not None | |
| print("INFO -Using onnx model") | |
| inference_engine = Inference(path_to_model, image) | |
| _, boxes, scores = inference_engine.pipeline() | |
| return boxes, scores | |
| else: | |
| raise ValueError(f"Invalid inference engine type: {inference_engine_type}") | |
| print("Starting inference") | |
| predictions = [] | |
| true_labels = [] | |
| pred_boxes = [] | |
| true_boxes_list = [] # List of lists, each inner list contains boxes for one image | |
| n_examples = len(test_dataset) | |
| n_boxes = [] | |
| model = YOLO(PATH_TO_MODEL) | |
| print("PATH_TO_MODEL", PATH_TO_MODEL) | |
| # First pass - process annotations | |
| has_smoke_list = [] | |
| annotations_list = [] | |
| for i, example in enumerate(test_dataset): | |
| if i % 200 == 0: | |
| print(f"Processing annotations {i+1} of {n_examples}") | |
| annotation = example.get("annotations", "").strip() | |
| n_annotations = len(annotation.split("\n")) | |
| n_boxes.append(n_annotations) | |
| has_smoke = len(annotation) > 0 | |
| has_smoke_list.append(has_smoke) | |
| true_labels.append(int(has_smoke)) | |
| annotations_list.append(annotation) | |
| if i == (N_TEST_BATCHES+1)*BATCH_SIZE-1: | |
| #break | |
| pass | |
| all_preds = [] | |
| all_scores = [] | |
| all_binary_classifications = [] | |
| # Second pass - batch predictions | |
| for i, batch_start in enumerate(range(0, n_examples, BATCH_SIZE)): | |
| batch_end = min(batch_start + BATCH_SIZE, n_examples) | |
| if i % 100 == 0: | |
| print(f"Processing batch {batch_start//BATCH_SIZE + 1} of {(n_examples + BATCH_SIZE - 1)//BATCH_SIZE}") | |
| print(f"Batch start: {batch_start}, Batch end: {batch_end}") | |
| # Get batch of images and pad if needed | |
| batch_images = [] | |
| for j in range(batch_start, batch_end): | |
| batch_images.append(test_dataset[j]['image']) | |
| # Pad the last batch if needed | |
| if len(batch_images) < BATCH_SIZE: | |
| print(f"Padding last batch from {len(batch_images)} to {BATCH_SIZE} images") | |
| padding_needed = BATCH_SIZE - len(batch_images) | |
| # Duplicate the last image to fill the batch | |
| batch_images.extend([batch_images[-1]] * padding_needed) | |
| print("Running predictions") | |
| # Get predictions for batch | |
| results = model.predict(batch_images, imgsz=INPUT_SIZE) | |
| # Only process the actual examples (not padding) | |
| actual_results = results[:batch_end-batch_start] | |
| batch_preds = [x.boxes.xywhn.tolist()[0] if len(x.boxes.xywhn.tolist()) > 0 else [] for x in actual_results] # Only the first box for simplicity | |
| batch_scores = [x.boxes.conf.tolist()[0] if len(x.boxes.conf.tolist()) > 0 else [] for x in actual_results] | |
| batch_binary_classifications = [int(len(x.boxes.xywhn.tolist()) > 0) for x in actual_results] | |
| all_preds += batch_preds | |
| all_scores += batch_scores | |
| all_binary_classifications += batch_binary_classifications | |
| print("Processing predictions") | |
| if i == N_TEST_BATCHES: | |
| from collections import Counter | |
| n_box_distr = Counter(n_boxes) | |
| print(n_box_distr) | |
| #break | |
| pass | |
| pred_boxes = [] | |
| for idx in range(len(all_preds)): | |
| if has_smoke_list[idx]: | |
| # Parse true boxes | |
| image_true_boxes = parse_boxes(annotations_list[idx]) | |
| true_boxes_list.append(image_true_boxes) | |
| # Process predicted boxes | |
| try: | |
| if len(all_preds[idx]) < 1: | |
| model_preds = [0, 0, 0, 0] | |
| else: | |
| model_preds = all_preds[idx] | |
| except: | |
| model_preds = [0, 0, 0, 0] | |
| pred_boxes.append(model_preds) | |
| print("Processing completed with last index", idx) | |
| #-------------------------------------------------------------------------------------------- | |
| # YOUR MODEL INFERENCE STOPS HERE | |
| #-------------------------------------------------------------------------------------------- | |
| # Stop tracking emissions | |
| emissions_data = tracker.stop_task() | |
| predictions = all_binary_classifications | |
| # Calculate classification metrics | |
| classification_accuracy = accuracy_score(true_labels, predictions) | |
| classification_precision = precision_score(true_labels, predictions) | |
| classification_recall = recall_score(true_labels, predictions) | |
| # Calculate mean IoU for object detection (only for images with smoke) | |
| # For each image, we compute the max IoU between the predicted box and all true boxes | |
| print("Calculating mean IoU") | |
| ious = [] | |
| for true_boxes, pred_box in zip(true_boxes_list, pred_boxes): | |
| max_iou = compute_max_iou(true_boxes, pred_box) | |
| ious.append(max_iou) | |
| mean_iou = float(np.mean(ious)) if ious else 0.0 | |
| print("Mean IoU calculated") | |
| # Prepare results dictionary | |
| results = { | |
| "username": username, | |
| "space_url": space_url, | |
| "submission_timestamp": datetime.now().isoformat(), | |
| "model_description": DESCRIPTION, | |
| "classification_accuracy": float(classification_accuracy), | |
| "classification_precision": float(classification_precision), | |
| "classification_recall": float(classification_recall), | |
| "mean_iou": mean_iou, | |
| "energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
| "emissions_gco2eq": emissions_data.emissions * 1000, | |
| "emissions_data": clean_emissions_data(emissions_data), | |
| "api_route": ROUTE, | |
| "dataset_config": { | |
| "dataset_name": request.dataset_name, | |
| "test_size": request.test_size, | |
| "test_seed": request.test_seed | |
| } | |
| } | |
| print("Result returned") | |
| return results |