image-Amiel / tasks /image.py
TheoLvs's picture
Update tasks/image.py
7f1af1e verified
from fastapi import APIRouter
print(1)
from datetime import datetime
print(2)
from datasets import load_dataset
print(3)
import numpy as np
print(4)
from sklearn.metrics import accuracy_score, precision_score, recall_score
print(5)
import random
print(6)
import os
print(7)
from ultralytics import YOLO
print(8)
from .utils.evaluation import ImageEvaluationRequest
print(9)
from .utils.emissions import tracker, clean_emissions_data, get_space_info
print(10)
from dotenv import load_dotenv
print(11)
load_dotenv()
print(12)
router = APIRouter()
print(13)
import torch
# Get CUDA version (the one PyTorch was compiled with)
print("CUDA version:", torch.version.cuda)
# Get cuDNN version
print("cuDNN version:", torch.backends.cudnn.version())
#MODEL_TYPE = "YOLOv11n"
DESCRIPTION = f"YOLOv11n 1280 not quantisized model with batch 1 inference on TensorRT"
print(14)
ROUTE = "/image"
print(15)
def parse_boxes(annotation_string):
"""Parse multiple boxes from a single annotation string.
Each box has 5 values: class_id, x_center, y_center, width, height"""
values = [float(x) for x in annotation_string.strip().split()]
boxes = []
# Each box has 5 values
for i in range(0, len(values), 5):
if i + 5 <= len(values):
# Skip class_id (first value) and take the next 4 values
box = values[i+1:i+5]
boxes.append(box)
return boxes
print(16)
def compute_iou(box1, box2):
"""Compute Intersection over Union (IoU) between two YOLO format boxes."""
# Convert YOLO format (x_center, y_center, width, height) to corners
def yolo_to_corners(box):
x_center, y_center, width, height = box
x1 = x_center - width/2
y1 = y_center - height/2
x2 = x_center + width/2
y2 = y_center + height/2
return np.array([x1, y1, x2, y2])
box1_corners = yolo_to_corners(box1)
box2_corners = yolo_to_corners(box2)
# Calculate intersection
x1 = max(box1_corners[0], box2_corners[0])
y1 = max(box1_corners[1], box2_corners[1])
x2 = min(box1_corners[2], box2_corners[2])
y2 = min(box1_corners[3], box2_corners[3])
intersection = max(0, x2 - x1) * max(0, y2 - y1)
# Calculate union
box1_area = (box1_corners[2] - box1_corners[0]) * (box1_corners[3] - box1_corners[1])
box2_area = (box2_corners[2] - box2_corners[0]) * (box2_corners[3] - box2_corners[1])
union = box1_area + box2_area - intersection
return intersection / (union + 1e-6)
print(17)
def compute_max_iou(true_boxes, pred_box):
"""Compute maximum IoU between a predicted box and all true boxes"""
max_iou = 0
for true_box in true_boxes:
iou = compute_iou(true_box, pred_box)
max_iou = max(max_iou, iou)
return max_iou
print(18)
def load_model(path_to_model, model_type="YOLO"):
if model_type == "YOLO":
model = YOLO(path_to_model)
else:
raise NotImplementedError
return model
print(19)
def get_boxes_list(predictions):
return [box.tolist() for box in predictions.boxes.xywhn]
print(20)
@router.post(ROUTE, tags=["Image Task"],
description=DESCRIPTION)
async def evaluate_image(request: ImageEvaluationRequest):
"""
Evaluate image classification and object detection for forest fire smoke.
Current Model: Random Baseline
- Makes random predictions for both classification and bounding boxes
- Used as a baseline for comparison
Metrics:
- Classification accuracy: Whether an image contains smoke or not
- Object Detection accuracy: IoU (Intersection over Union) for smoke bounding boxes
"""
# Get space info
username, space_url = get_space_info()
# Load and prepare the dataset
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN"))
# Split dataset
test_dataset = dataset["test"]
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE CODE HERE
# Update the code below to replace the random baseline with your model inference
#--------------------------------------------------------------------------------------------
import cv2
import onnxruntime
import matplotlib.pyplot as plt
#PATH_TO_MODEL = 'models/best_YOLOv11n_1280.onnx'
#PATH_TO_MODEL = 'models/best_yolov6n_1280.pt'
#PATH_TO_MODEL = 'models/best_YOLOv11n_1280_real_half.onnx'
PATH_TO_MODEL = 'models/best_YOLOv11n_1280_half_batch_1.engine'
INFERENCE_ENGINE_TYPE = 'pt'
INPUT_SIZE = 1280
N_TEST_BATCHES = 2
BATCH_SIZE = 1 # Can be adjusted as needed
print("PATH_TO_MODEL", PATH_TO_MODEL)
def preprocessor(frame):
#frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Only when read from file
x = cv2.resize(frame, (INPUT_SIZE, INPUT_SIZE))
image_data = np.array(x).astype(np.float32) / 255.0 # Normalize to [0, 1] range
image_data = np.transpose(image_data, (2, 0, 1)) # (H, W, C) -> (C, H, W)
image_data = np.expand_dims(image_data, axis=0) # Add batch dimension
return image_data
class Inference:
def __init__(self, model, image):
self.session = onnxruntime.InferenceSession(model, providers=["CPUExecutionProvider"]
#providers=["CUDAExecutionProvider"]
)
model_inputs = self.session.get_inputs()
input_shape = model_inputs[0].shape
self.image = image
self.input_width = input_shape[2]
self.input_height = input_shape[3]
self.classes = {0: 'smoke'}
def detector(self, image_data):
ort = onnxruntime.OrtValue.ortvalue_from_numpy(image_data)
return self.session.run(["output0"], {"images": ort})
def postprocessor(self, results, frame, confidence, iou):
img_height, img_width = frame.shape[:2]
outputs = np.transpose(np.squeeze(results[0]))
rows = outputs.shape[0]
boxes = []
final_boxes = []
final_scores = []
scores = []
class_ids = []
x_factor = img_width / self.input_width
y_factor = img_height / self.input_height
max_of_max_scores = 0
for i in range(rows):
classes_scores = outputs[i][4:]
max_score = np.amax(classes_scores)
if max_score >= confidence:
class_id = np.argmax(classes_scores)
x, y, w, h = outputs[i][0], outputs[i][1], outputs[i][2], outputs[i][3]
# Calculate the scaled coordinates of the bounding box
left = int(x * x_factor) / img_width
top = int(y * y_factor) / img_height
width = int(w * x_factor) / img_width
height = int(h * y_factor) / img_height
class_ids.append(class_id)
scores.append(max_score)
boxes.append([left, top, width, height])
max_of_max_scores = max(max_of_max_scores, max_score)
# Apply non-maximum suppression to filter out overlapping bounding boxes
indices = cv2.dnn.NMSBoxes(boxes, scores, confidence, iou)
for i in indices:
box = boxes[i]
score = scores[i]
class_id = class_ids[i]
final_boxes.append(box)
final_scores.append(score)
return frame, final_boxes, final_scores
def pipeline(self):
if isinstance(self.image, str):
frame = cv2.imread(self.image)
else:
frame = np.array(self.image)
preprocessed = preprocessor(frame)
detected = self.detector(preprocessed)
frame, boxes, scores = self.postprocessor(detected, frame, 0.20,0.20)
return frame, boxes, scores
def predict(inference_engine_type, image, path_to_model=None, model=None):
if inference_engine_type == 'pt':
print("INFO - Using pytorch model")
assert model is not None
res = model.predict(image, imgsz=INPUT_SIZE)[0]
boxes = res.boxes.xywhn.tolist()
confidences = res.boxes.conf.tolist()
return boxes, confidences
elif inference_engine_type == 'onnx':
assert path_to_model is not None
print("INFO -Using onnx model")
inference_engine = Inference(path_to_model, image)
_, boxes, scores = inference_engine.pipeline()
return boxes, scores
else:
raise ValueError(f"Invalid inference engine type: {inference_engine_type}")
print("Starting inference")
predictions = []
true_labels = []
pred_boxes = []
true_boxes_list = [] # List of lists, each inner list contains boxes for one image
n_examples = len(test_dataset)
n_boxes = []
model = YOLO(PATH_TO_MODEL)
print("PATH_TO_MODEL", PATH_TO_MODEL)
# First pass - process annotations
has_smoke_list = []
annotations_list = []
for i, example in enumerate(test_dataset):
if i % 200 == 0:
print(f"Processing annotations {i+1} of {n_examples}")
annotation = example.get("annotations", "").strip()
n_annotations = len(annotation.split("\n"))
n_boxes.append(n_annotations)
has_smoke = len(annotation) > 0
has_smoke_list.append(has_smoke)
true_labels.append(int(has_smoke))
annotations_list.append(annotation)
if i == (N_TEST_BATCHES+1)*BATCH_SIZE-1:
#break
pass
all_preds = []
all_scores = []
all_binary_classifications = []
# Second pass - batch predictions
for i, batch_start in enumerate(range(0, n_examples, BATCH_SIZE)):
batch_end = min(batch_start + BATCH_SIZE, n_examples)
if i % 100 == 0:
print(f"Processing batch {batch_start//BATCH_SIZE + 1} of {(n_examples + BATCH_SIZE - 1)//BATCH_SIZE}")
print(f"Batch start: {batch_start}, Batch end: {batch_end}")
# Get batch of images and pad if needed
batch_images = []
for j in range(batch_start, batch_end):
batch_images.append(test_dataset[j]['image'])
# Pad the last batch if needed
if len(batch_images) < BATCH_SIZE:
print(f"Padding last batch from {len(batch_images)} to {BATCH_SIZE} images")
padding_needed = BATCH_SIZE - len(batch_images)
# Duplicate the last image to fill the batch
batch_images.extend([batch_images[-1]] * padding_needed)
print("Running predictions")
# Get predictions for batch
results = model.predict(batch_images, imgsz=INPUT_SIZE)
# Only process the actual examples (not padding)
actual_results = results[:batch_end-batch_start]
batch_preds = [x.boxes.xywhn.tolist()[0] if len(x.boxes.xywhn.tolist()) > 0 else [] for x in actual_results] # Only the first box for simplicity
batch_scores = [x.boxes.conf.tolist()[0] if len(x.boxes.conf.tolist()) > 0 else [] for x in actual_results]
batch_binary_classifications = [int(len(x.boxes.xywhn.tolist()) > 0) for x in actual_results]
all_preds += batch_preds
all_scores += batch_scores
all_binary_classifications += batch_binary_classifications
print("Processing predictions")
if i == N_TEST_BATCHES:
from collections import Counter
n_box_distr = Counter(n_boxes)
print(n_box_distr)
#break
pass
pred_boxes = []
for idx in range(len(all_preds)):
if has_smoke_list[idx]:
# Parse true boxes
image_true_boxes = parse_boxes(annotations_list[idx])
true_boxes_list.append(image_true_boxes)
# Process predicted boxes
try:
if len(all_preds[idx]) < 1:
model_preds = [0, 0, 0, 0]
else:
model_preds = all_preds[idx]
except:
model_preds = [0, 0, 0, 0]
pred_boxes.append(model_preds)
print("Processing completed with last index", idx)
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE STOPS HERE
#--------------------------------------------------------------------------------------------
# Stop tracking emissions
emissions_data = tracker.stop_task()
predictions = all_binary_classifications
# Calculate classification metrics
classification_accuracy = accuracy_score(true_labels, predictions)
classification_precision = precision_score(true_labels, predictions)
classification_recall = recall_score(true_labels, predictions)
# Calculate mean IoU for object detection (only for images with smoke)
# For each image, we compute the max IoU between the predicted box and all true boxes
print("Calculating mean IoU")
ious = []
for true_boxes, pred_box in zip(true_boxes_list, pred_boxes):
max_iou = compute_max_iou(true_boxes, pred_box)
ious.append(max_iou)
mean_iou = float(np.mean(ious)) if ious else 0.0
print("Mean IoU calculated")
# Prepare results dictionary
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"classification_accuracy": float(classification_accuracy),
"classification_precision": float(classification_precision),
"classification_recall": float(classification_recall),
"mean_iou": mean_iou,
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
print("Result returned")
return results