PreviDengueAPI / detect.py
GitHub Actions
Auto-deploy from GitHub
b5c878a
from collections import Counter
import math
import numpy as np
from PIL import Image
from io import BytesIO
from ultralytics import YOLO
def _compute_iou(box_a, box_b):
ax1, ay1, ax2, ay2 = box_a
bx1, by1, bx2, by2 = box_b
inter_x1 = max(ax1, bx1)
inter_y1 = max(ay1, by1)
inter_x2 = min(ax2, bx2)
inter_y2 = min(ay2, by2)
inter_w = max(0.0, inter_x2 - inter_x1)
inter_h = max(0.0, inter_y2 - inter_y1)
inter_area = inter_w * inter_h
area_a = max(0.0, (ax2 - ax1)) * max(0.0, (ay2 - ay1))
area_b = max(0.0, (bx2 - bx1)) * max(0.0, (by2 - by1))
union = area_a + area_b - inter_area
return inter_area / union if union > 0 else 0.0
def _nms_xyxy(boxes, scores, iou_threshold=0.5):
if len(boxes) == 0:
return []
idxs = np.argsort(scores)[::-1]
keep = []
while len(idxs) > 0:
i = idxs[0]
keep.append(i)
if len(idxs) == 1:
break
rest = idxs[1:]
ious = np.array([_compute_iou(boxes[i], boxes[j]) for j in rest])
idxs = rest[ious <= iou_threshold]
return keep
class DengueDetector:
def __init__(self, model_path="./models/detect.pt"):
self.model = YOLO(model_path)
self.names = self.model.names
self.tile_size = 1024
self.default_overlap = 0.2
self.fast_max_side = 3072
self.batch_tiles = 8
try:
if hasattr(self.model, "fuse"):
self.model.fuse()
except Exception:
pass
print("Modelo carregado com as seguintes classes:", self.names)
def calculate_intensity(self, objects):
if not objects:
return 0.0
weights = {
"piscina_suja": 10.0,
"reservatorio_de_agua": 8.0,
"pneu": 6.0,
"lona": 4.0,
"monte_de_lixo": 3.0,
"saco_de_lixo": 2.0,
"piscina_limpa": 1.0
}
total_score = 0.0
first_obj = objects[0]
img_w = first_obj["box"]["original_width"]
img_h = first_obj["box"]["original_height"]
total_img_area = float(img_w * img_h)
if total_img_area == 0:
for obj in objects:
weight = weights.get(obj["class"], 1.0)
confidence = obj["confidence"]
total_score += weight * confidence
return total_score
for obj in objects:
weight = weights.get(obj["class"], 1.0)
confidence = obj["confidence"]
box = obj["box"]
w = box["x2"] - box["x1"]
h = box["y2"] - box["y1"]
obj_area = w * h
relative_area = obj_area / total_img_area
# risco = Peso * Confiança * Área Relativa
risk_contribution = weight * confidence * relative_area
total_score += risk_contribution
return total_score * 100.0
def detect_image(self, image_bytes, fast: bool = True):
img = Image.open(BytesIO(image_bytes)).convert("RGB")
orig_width, orig_height = img.size
scale = 1.0
tile_size = self.tile_size
overlap = self.default_overlap
if fast:
max_side = max(orig_width, orig_height)
if max_side > self.fast_max_side:
scale = self.fast_max_side / float(max_side)
new_w = max(1, int(round(orig_width * scale)))
new_h = max(1, int(round(orig_height * scale)))
img_resized = img.resize((new_w, new_h), resample=Image.BILINEAR)
else:
img_resized = img
else:
img_resized = img
img_np = np.array(img_resized)
height, width = img_np.shape[:2]
stride = max(1, int(tile_size * (1 - overlap)))
def compute_starts(total, size, stride):
starts = list(range(0, max(total - size, 0) + 1, stride))
if len(starts) == 0:
starts = [0]
last = max(total - size, 0)
if starts[-1] != last:
starts.append(last)
return starts
x_starts = compute_starts(width, tile_size, stride)
y_starts = compute_starts(height, tile_size, stride)
tiles = []
origins = []
for y0 in y_starts:
for x0 in x_starts:
x1 = x0
y1 = y0
x2 = min(x0 + tile_size, width)
y2 = min(y0 + tile_size, height)
tile = img_np[y1:y2, x1:x2, :]
if tile.size == 0:
continue
tiles.append(tile)
origins.append((x1, y1))
all_boxes = []
all_scores = []
all_classes = []
if len(tiles) > 0:
bs = max(1, int(self.batch_tiles))
for i in range(0, len(tiles), bs):
batch = tiles[i:i+bs]
batch_origins = origins[i:i+bs]
results = self.model(batch, verbose=False)
for res, (ox, oy) in zip(results, batch_origins):
boxes = res.boxes
if boxes is None or len(boxes) == 0:
continue
class_ids = boxes.cls.tolist()
confidences = boxes.conf.tolist()
xyxy = boxes.xyxy.cpu().numpy() if hasattr(boxes.xyxy, 'cpu') else np.array(boxes.xyxy)
for j in range(len(class_ids)):
bx1, by1, bx2, by2 = map(float, xyxy[j])
all_boxes.append((bx1 + ox, by1 + oy, bx2 + ox, by2 + oy))
all_scores.append(float(confidences[j]))
all_classes.append(int(class_ids[j]))
final_boxes = []
final_scores = []
final_classes = []
all_boxes_np = np.array(all_boxes, dtype=float)
all_scores_np = np.array(all_scores, dtype=float)
all_classes_np = np.array(all_classes, dtype=int)
for cls in set(all_classes_np.tolist()) if len(all_classes_np) else []:
cls_mask = (all_classes_np == cls)
boxes_cls = all_boxes_np[cls_mask]
scores_cls = all_scores_np[cls_mask]
keep = _nms_xyxy(boxes_cls, scores_cls, iou_threshold=0.5)
for k in keep:
final_boxes.append(tuple(boxes_cls[k]))
final_scores.append(float(scores_cls[k]))
final_classes.append(int(cls))
detections = []
class_names = []
for b, s, c in zip(final_boxes, final_scores, final_classes):
x1, y1, x2, y2 = map(float, b)
if scale != 1.0:
inv = 1.0 / scale
x1 *= inv
y1 *= inv
x2 *= inv
y2 *= inv
cname = self.names[int(c)]
if cname == "lona" and s < 0.6:
continue
class_names.append(cname)
detections.append({
"class": cname,
"confidence": round(s, 4),
"box": {
"x1": x1, "y1": y1, "x2": x2, "y2": y2,
"original_width": orig_width, "original_height": orig_height
}
})
counts = Counter(class_names)
intensity_score = self.calculate_intensity(detections)
return {
"total": len(detections),
"contagem": counts,
"objetos": detections,
"intensity_score": intensity_score
}