hasari-api / services /backend /severity_classifier.py
erdoganpeker's picture
v0.3.0 — multimodal vehicle damage MVP
e327f0d
"""
severity_classifier.py
Hasar siddet siniflandirma. Iki strateji destekler:
1. RuleBasedSeverity: maske alani + parca onemi + hasar tipi → skor → siddet
2. CNNSeverity: hasar bolgesini crop → YOLO-cls → siddet
Ensemble: ikisini kombine eder.
Kullanim:
# CNN egit
python severity_classifier.py train --data data/severity_yolo \\
--model yolo26n-cls --epochs 50
# Tek goruntu uzerinde test (her iki yontem)
python severity_classifier.py test \\
--image car.jpg \\
--damage_weights runs/.../damage_best.pt \\
--severity_weights runs/.../severity_best.pt
"""
import argparse
import json
from pathlib import Path
import cv2
import numpy as np
from ultralytics import YOLO
SEVERITY_LEVELS = ["hafif", "orta", "agir"]
# Parca onem katsayilari - hasarin gozukurlugu ve onarim maliyeti agirligi
# 1.0 = ortalama, daha yuksek = daha kritik
PART_IMPORTANCE = {
"front_bumper": 1.2,
"back_bumper": 1.1,
"hood": 1.4,
"front_glass": 1.6, # On cam ciddi
"back_glass": 1.3,
"front_left_door": 1.2,
"front_right_door": 1.2,
"back_left_door": 1.1,
"back_right_door": 1.1,
"front_left_light": 1.3,
"front_right_light": 1.3,
"back_left_light": 1.1,
"back_right_light": 1.1,
"front_light": 1.3,
"back_light": 1.1,
"left_mirror": 0.9,
"right_mirror": 0.9,
"tailgate": 1.2,
"trunk": 1.2,
"wheel": 1.0,
"unknown": 1.0,
}
# Hasar tipi agirligi - kac kat ciddidir
DAMAGE_TYPE_WEIGHT = {
"scratch": 0.6, # En hafif
"dent": 1.0, # Referans
"crack": 1.4, # Cizgisel, yayilir
"glass_shatter": 2.0, # Cam parcalanmasi = aliyo parca degisimi
"lamp_broken": 1.8, # Far kirilmasi
"tire_flat": 1.5, # Lastik
}
class RuleBasedSeverity:
"""Aciklanabilir, kural tabanli siddet skorlama.
score = area_ratio * part_importance * damage_type_weight * 100
Esikler:
score < 1.0 → hafif
1.0 ≤ score < 4.0 → orta
score ≥ 4.0 → agir
"""
THRESHOLDS = [1.0, 4.0]
def predict(self, damage_type, part_name, area_ratio):
damage_w = DAMAGE_TYPE_WEIGHT.get(damage_type, 1.0)
part_w = PART_IMPORTANCE.get(part_name, 1.0)
score = area_ratio * 100.0 * damage_w * part_w
if score < self.THRESHOLDS[0]:
level = "hafif"
elif score < self.THRESHOLDS[1]:
level = "orta"
else:
level = "agir"
return {
"level": level,
"score": round(score, 4),
"confidence": min(1.0, score / 6.0 + 0.5), # heuristik
"method": "rule_based",
"components": {
"area_ratio": area_ratio,
"damage_weight": damage_w,
"part_weight": part_w,
},
}
class CNNSeverity:
"""CNN siddet siniflandirici.
Supports two checkpoint formats:
(a) Ultralytics YOLO-cls .pt (loaded with YOLO())
(b) Custom torchvision checkpoint dict from train_severity.py
with keys: model_state_dict, classes, tr_names, arch, img_size
"""
IMAGENET_MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
IMAGENET_STD = np.array([0.229, 0.224, 0.225], dtype=np.float32)
def __init__(self, weights_path):
import torch
self._torch = torch
self.weights_path = weights_path
self.kind = None # "yolo" or "torchvision"
self.model = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.img_size = 224
self.class_names = list(SEVERITY_LEVELS)
self._load(weights_path)
def _load(self, weights_path):
torch = self._torch
# Try torchvision-style checkpoint first
try:
ckpt = torch.load(weights_path, map_location="cpu", weights_only=False)
except Exception:
ckpt = None
if isinstance(ckpt, dict) and "model_state_dict" in ckpt:
self._load_torchvision(ckpt)
return
# Fall back to YOLO loader
self.model = YOLO(weights_path)
self.kind = "yolo"
def _load_torchvision(self, ckpt):
torch = self._torch
from torchvision.models import efficientnet_b0
import torch.nn as nn
arch = ckpt.get("arch", "efficientnet_b0")
if arch != "efficientnet_b0":
raise ValueError(f"Unsupported torchvision arch: {arch}")
model = efficientnet_b0(weights=None)
# Replace classifier head to match training (3 classes)
n_classes = len(ckpt.get("classes", SEVERITY_LEVELS))
in_features = model.classifier[1].in_features
model.classifier[1] = nn.Linear(in_features, n_classes)
model.load_state_dict(ckpt["model_state_dict"])
model.eval()
model.to(self.device)
self.model = model
self.kind = "torchvision"
self.img_size = int(ckpt.get("img_size", 224))
# Prefer Turkish labels (hafif/orta/agir); fall back to raw class names
self.class_names = list(ckpt.get("tr_names", ckpt.get("classes", SEVERITY_LEVELS)))
@staticmethod
def _bgr_to_normalized_tensor(image_crop, img_size: int):
import torch
rgb = cv2.cvtColor(image_crop, cv2.COLOR_BGR2RGB)
rgb = cv2.resize(rgb, (img_size, img_size), interpolation=cv2.INTER_AREA)
arr = rgb.astype(np.float32) / 255.0
arr = (arr - CNNSeverity.IMAGENET_MEAN) / CNNSeverity.IMAGENET_STD
arr = np.transpose(arr, (2, 0, 1)) # HWC -> CHW
return torch.from_numpy(arr).unsqueeze(0)
def predict(self, image_crop):
"""Tek bir BGR crop verir, en yuksek siddet sinifini dondur."""
if image_crop is None or image_crop.size == 0:
return {"level": "hafif", "confidence": 0.0, "method": "cnn_empty"}
if self.kind == "torchvision":
torch = self._torch
try:
x = self._bgr_to_normalized_tensor(image_crop, self.img_size).to(self.device)
with torch.inference_mode():
logits = self.model(x)
probs = torch.softmax(logits, dim=1)[0].cpu().numpy()
except Exception:
return {"level": "orta", "confidence": 0.0, "method": "cnn_failed"}
idx = int(np.argmax(probs))
cls_names = self.class_names
return {
"level": cls_names[idx],
"confidence": float(probs[idx]),
"method": "cnn_torchvision",
"all_probs": {cls_names[i]: float(p) for i, p in enumerate(probs)},
}
# YOLO-cls path
result = self.model.predict(image_crop, verbose=False)[0]
if not hasattr(result, "probs") or result.probs is None:
return {"level": "orta", "confidence": 0.0, "method": "cnn_failed"}
probs = result.probs.data.cpu().numpy()
idx = int(np.argmax(probs))
cls_names = list(self.model.names.values()) if hasattr(self.model, "names") else SEVERITY_LEVELS
return {
"level": cls_names[idx],
"confidence": float(probs[idx]),
"method": "cnn_yolo",
"all_probs": {cls_names[i]: float(p) for i, p in enumerate(probs)},
}
class EnsembleSeverity:
"""Hibrit: kural tabanli + CNN ensemble. CNN yoksa sadece kural."""
def __init__(self, cnn_weights=None, rule_weight=0.4, cnn_weight=0.6):
self.rule = RuleBasedSeverity()
self.cnn = CNNSeverity(cnn_weights) if cnn_weights else None
self.rule_weight = rule_weight
self.cnn_weight = cnn_weight
def predict(self, damage_type, part_name, area_ratio, image_crop=None):
rule_pred = self.rule.predict(damage_type, part_name, area_ratio)
if self.cnn is None or image_crop is None:
rule_pred["method"] = "rule_only"
return rule_pred
cnn_pred = self.cnn.predict(image_crop)
# Anlasma varsa kolay
if rule_pred["level"] == cnn_pred["level"]:
return {
"level": rule_pred["level"],
"confidence": max(rule_pred["confidence"], cnn_pred["confidence"]),
"method": "ensemble_agree",
"rule": rule_pred,
"cnn": cnn_pred,
}
# Catismada: agirlikli oyla
scores = {lvl: 0.0 for lvl in SEVERITY_LEVELS}
scores[rule_pred["level"]] += self.rule_weight * rule_pred["confidence"]
scores[cnn_pred["level"]] += self.cnn_weight * cnn_pred["confidence"]
if "all_probs" in cnn_pred:
for lvl, p in cnn_pred["all_probs"].items():
if lvl in scores:
scores[lvl] += 0.3 * p
best = max(scores, key=scores.get)
return {
"level": best,
"confidence": scores[best],
"method": "ensemble_resolved",
"rule": rule_pred,
"cnn": cnn_pred,
"ensemble_scores": scores,
}
def crop_damage(image, bbox, padding=0.15):
"""Bbox cevresinde padding'li crop dondur."""
h, w = image.shape[:2]
x1, y1, x2, y2 = bbox
bw, bh = x2 - x1, y2 - y1
px, py = int(bw * padding), int(bh * padding)
x1 = max(0, int(x1) - px)
y1 = max(0, int(y1) - py)
x2 = min(w, int(x2) + px)
y2 = min(h, int(y2) + py)
return image[y1:y2, x1:x2]
def cmd_train(args):
"""YOLO-cls ile siddet modeli egit."""
model = YOLO(f"{args.model}.pt")
model.train(
data=args.data,
epochs=args.epochs,
imgsz=args.imgsz,
batch=args.batch,
device=args.device,
optimizer="AdamW",
lr0=0.001,
weight_decay=0.0001,
patience=20,
plots=True,
project="runs/arac-hasar",
name=f"severity_{args.model}_ep{args.epochs}",
)
def cmd_test(args):
"""Tek goruntude tum siddet siniflandirma yontemlerini dene."""
image = cv2.imread(args.image)
if image is None:
raise FileNotFoundError(args.image)
# Hasar modeli ile bul
damage_model = YOLO(args.damage_weights)
results = damage_model.predict(image, verbose=False)[0]
if not results.boxes:
print("Hasar bulunamadi.")
return
ensemble = EnsembleSeverity(cnn_weights=args.severity_weights)
h, w = image.shape[:2]
output = []
for i, box in enumerate(results.boxes):
cls_id = int(box.cls.item())
damage_type = damage_model.names[cls_id]
bbox = box.xyxy[0].tolist()
# Alan orani: maske varsa maskeden, yoksa bbox'tan.
# results.masks.data is at the model's mask resolution (typ. 160x160
# or imgsz/4), so divide by the mask's own area for a consistent
# ratio. Bbox-based fallback divides by the full image area.
if results.masks is not None and i < len(results.masks.data):
mask_t = results.masks.data[i]
mh, mw = mask_t.shape[-2:]
area_pixels = float(mask_t.sum())
area_ratio = area_pixels / float(mh * mw) if (mh * mw) else 0.0
else:
area_pixels = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
area_ratio = area_pixels / (h * w)
# Parca varsayalim "unknown" (parca pipeline'i pipeline.py'da entegre)
crop = crop_damage(image, bbox)
sev = ensemble.predict(damage_type, "unknown", area_ratio, crop)
output.append({
"damage_id": i,
"type": damage_type,
"area_ratio": round(area_ratio, 5),
"severity": sev,
})
print(json.dumps(output, indent=2, ensure_ascii=False))
def main():
parser = argparse.ArgumentParser()
subs = parser.add_subparsers(dest="cmd")
p_train = subs.add_parser("train")
p_train.add_argument("--data", type=str, required=True,
help="data/severity_yolo (train/val/test alti hafif/orta/agir klasorlu)")
p_train.add_argument("--model", type=str, default="yolo26n-cls")
p_train.add_argument("--epochs", type=int, default=50)
p_train.add_argument("--imgsz", type=int, default=224)
p_train.add_argument("--batch", type=int, default=64)
p_train.add_argument("--device", type=str, default="0")
p_train.set_defaults(func=cmd_train)
p_test = subs.add_parser("test")
p_test.add_argument("--image", type=str, required=True)
p_test.add_argument("--damage_weights", type=str, required=True)
p_test.add_argument("--severity_weights", type=str, default=None)
p_test.set_defaults(func=cmd_test)
args = parser.parse_args()
if hasattr(args, "func"):
args.func(args)
else:
parser.print_help()
if __name__ == "__main__":
main()