""" quick_test.py - Hizli end-to-end test scripti. Egitilmis modelleri val setinden secilen ornek goruntuler uzerinde calistirir, JSON ciktisi + annotated overlay PNG uretir. Tek ihtiyacin olan: cd services/ml .venv\\Scripts\\activate (Windows) python quick_test.py Ne yapar: 1. Mini-test (yolo11n-seg, 2 epoch) ile damage segmentation 2. EfficientNet-B0 ile severity classification (her bbox crop'una) 3. Kural-tabanli severity (referans icin) 4. quick_test_out/ altina .json + .overlay.jpg yazar Argument istemiyorsan opsiyonel: python quick_test.py --num 5 --conf 0.15 """ from __future__ import annotations import argparse import json import random from pathlib import Path import cv2 import numpy as np import torch import torch.nn.functional as F from torchvision import transforms from torchvision.models import efficientnet_b0 from ultralytics import YOLO ML_DIR = Path(__file__).resolve().parent def _latest_snapshot() -> Path | None: bundles = ML_DIR / "runs" / "bundles" if not bundles.exists(): return None cands = sorted( (p for p in bundles.iterdir() if p.is_dir() and (p / "_SNAPSHOT_FOR_BUILD").is_dir() and p.name.startswith("full_")), key=lambda p: p.stat().st_mtime, reverse=True, ) return (cands[0] / "_SNAPSHOT_FOR_BUILD") if cands else None _SNAP = _latest_snapshot() # Prefer the production snapshot (full_YYYYMMDD_HHMMSS) when present; fall # back to the legacy mini-test path so this script keeps working in fresh # checkouts that have only trained the smoke run. if _SNAP is not None and (_SNAP / "damage_best.pt").exists(): DEFAULT_DAMAGE_W = _SNAP / "damage_best.pt" else: DEFAULT_DAMAGE_W = ML_DIR / "runs" / "segment" / "runs" / "arac-hasar" / "mini-test" / "weights" / "best.pt" if _SNAP is not None and (_SNAP / "severity_best.pt").exists(): DEFAULT_SEVERITY_W = _SNAP / "severity_best.pt" else: DEFAULT_SEVERITY_W = ML_DIR / "runs" / "severity" / "best.pt" DEFAULT_VAL_DIR = ML_DIR / "data" / "cardd_yolo" / "images" / "val" DEFAULT_OUT = ML_DIR / "quick_test_out" CARDD_CLASSES = ["dent", "scratch", "crack", "glass_shatter", "lamp_broken", "tire_flat"] # severity_classifier.py'den birebir kopya (rule-based referans icin) PART_IMPORTANCE_DEFAULT = 1.0 DAMAGE_TYPE_WEIGHT = { "scratch": 0.6, "dent": 1.0, "crack": 1.4, "glass_shatter": 2.0, "lamp_broken": 1.8, "tire_flat": 1.5, } def rule_based_severity(damage_type: str, area_ratio: float) -> dict: weight = DAMAGE_TYPE_WEIGHT.get(damage_type, 1.0) score = area_ratio * 100.0 * weight * PART_IMPORTANCE_DEFAULT if score < 1.0: level = "hafif" elif score < 4.0: level = "orta" else: level = "agir" return {"level": level, "score": round(score, 4), "method": "rule_based"} class SeverityCNN: """EfficientNet-B0 severity classifier (train_severity.py ile uyumlu).""" def __init__(self, ckpt_path: Path, device: str = "cuda"): self.device = device if torch.cuda.is_available() else "cpu" ckpt = torch.load(ckpt_path, map_location=self.device, weights_only=False) self.classes = ckpt["classes"] self.tr_names = ckpt["tr_names"] self.img_size = ckpt.get("img_size", 224) self.train_val_acc = ckpt.get("val_acc", None) model = efficientnet_b0(weights=None) in_features = model.classifier[1].in_features model.classifier[1] = torch.nn.Linear(in_features, len(self.classes)) model.load_state_dict(ckpt["model_state_dict"]) model.to(self.device).eval() self.model = model self.tf = transforms.Compose([ transforms.ToPILImage(), transforms.Resize((self.img_size, self.img_size)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) def predict_bgr(self, bgr_crop: np.ndarray) -> dict: if bgr_crop is None or bgr_crop.size == 0: return {"level": "hafif", "confidence": 0.0, "method": "cnn_empty"} rgb = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2RGB) x = self.tf(rgb).unsqueeze(0).to(self.device) with torch.no_grad(): logits = self.model(x) probs = F.softmax(logits, dim=1)[0].cpu().numpy() idx = int(np.argmax(probs)) return { "level": self.tr_names[idx], "raw_class": self.classes[idx], "confidence": float(probs[idx]), "method": "cnn", "all_probs": { self.tr_names[i]: float(p) for i, p in enumerate(probs) }, } def crop_padded(image: np.ndarray, bbox, padding: float = 0.15) -> np.ndarray: h, w = image.shape[:2] x1, y1, x2, y2 = bbox bw, bh = x2 - x1, y2 - y1 px, py = int(bw * padding), int(bh * padding) x1 = max(0, int(x1) - px) y1 = max(0, int(y1) - py) x2 = min(w, int(x2) + px) y2 = min(h, int(y2) + py) return image[y1:y2, x1:x2] def annotate(image: np.ndarray, damages: list) -> np.ndarray: overlay = image.copy() palette = { "hafif": (60, 200, 60), "orta": (40, 180, 230), "agir": (40, 40, 230), } for d in damages: x1 = int(d["bbox"]["x1"]); y1 = int(d["bbox"]["y1"]) x2 = int(d["bbox"]["x2"]); y2 = int(d["bbox"]["y2"]) sev = d.get("severity_cnn", {}).get("level", "hafif") color = palette.get(sev, (200, 200, 200)) cv2.rectangle(overlay, (x1, y1), (x2, y2), color, 2) label = f"{d['type']} | cnn={sev} | rule={d['severity_rule']['level']}" (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) cv2.rectangle(overlay, (x1, max(0, y1 - th - 6)), (x1 + tw + 6, y1), color, -1) cv2.putText(overlay, label, (x1 + 3, max(th, y1 - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA) return overlay def run_one(damage_model: YOLO, sev_cnn: SeverityCNN | None, img_path: Path, out_dir: Path, conf: float, imgsz: int) -> dict: image = cv2.imread(str(img_path)) if image is None: return {"image": str(img_path), "error": "read_failed"} h, w = image.shape[:2] pred = damage_model.predict(image, imgsz=imgsz, conf=conf, verbose=False)[0] damages = [] for i, box in enumerate(pred.boxes): cls_id = int(box.cls.item()) damage_type = CARDD_CLASSES[cls_id] if cls_id < len(CARDD_CLASSES) else f"cls_{cls_id}" det_conf = float(box.conf.item()) x1, y1, x2, y2 = box.xyxy[0].tolist() # NOTE: pred.masks.data is at YOLO's internal resolution (typically # 160x160 or imgsz/4), not the original image size. To get an # area-ratio that's comparable to the original frame we either # resize the mask to (h, w) first, or compute the ratio against the # mask's own pixel space — the latter is cheaper and equivalent. if pred.masks is not None and i < len(pred.masks.data): mask_t = pred.masks.data[i] mh, mw = mask_t.shape[-2:] area_pixels = float(mask_t.sum()) # pixels in mask-space area_ratio = area_pixels / float(mh * mw) else: area_pixels = (x2 - x1) * (y2 - y1) area_ratio = area_pixels / (h * w) crop = crop_padded(image, (x1, y1, x2, y2)) sev_cnn_pred = sev_cnn.predict_bgr(crop) if sev_cnn else None sev_rule = rule_based_severity(damage_type, area_ratio) damages.append({ "id": i, "type": damage_type, "detection_confidence": round(det_conf, 4), "bbox": {"x1": x1, "y1": y1, "x2": x2, "y2": y2}, "area_ratio": round(area_ratio, 6), "severity_rule": sev_rule, "severity_cnn": sev_cnn_pred, }) summary = { "image": img_path.name, "width": w, "height": h, "damage_count": len(damages), "classes_detected": sorted({d["type"] for d in damages}), "max_detection_conf": round(max((d["detection_confidence"] for d in damages), default=0.0), 4), } result = {"summary": summary, "damages": damages} out_dir.mkdir(parents=True, exist_ok=True) json_path = out_dir / f"{img_path.stem}.json" json_path.write_text(json.dumps(result, indent=2, ensure_ascii=False)) overlay = annotate(image, damages) overlay_path = out_dir / f"{img_path.stem}.overlay.jpg" cv2.imwrite(str(overlay_path), overlay) return result def main(): ap = argparse.ArgumentParser() ap.add_argument("--damage_weights", type=Path, default=DEFAULT_DAMAGE_W) ap.add_argument("--severity_weights", type=Path, default=DEFAULT_SEVERITY_W) ap.add_argument("--val_dir", type=Path, default=DEFAULT_VAL_DIR) ap.add_argument("--out_dir", type=Path, default=DEFAULT_OUT) ap.add_argument("--num", type=int, default=3, help="Kac val gorseli denesin") # Defaults track the production snapshot when present; otherwise we # assume the legacy mini-test model (imgsz=480, low conf). _is_prod_snap = _SNAP is not None and (_SNAP / "damage_best.pt").exists() _default_conf = 0.25 if _is_prod_snap else 0.15 _default_imgsz = 640 if _is_prod_snap else 480 ap.add_argument("--conf", type=float, default=_default_conf, help="Detection confidence threshold " "(snapshot: 0.25, mini-test: 0.15)") ap.add_argument("--imgsz", type=int, default=_default_imgsz, help="Inference image size (snapshot: 640, mini-test: 480)") ap.add_argument("--seed", type=int, default=42) ap.add_argument("--image", type=Path, default=None, help="Belirli bir gorsel ver, val secimini atla") args = ap.parse_args() if not args.damage_weights.exists(): raise FileNotFoundError( f"Damage agirligi yok: {args.damage_weights}\n" f"Once `python train.py` tamamla.") print(f"[+] Damage model: {args.damage_weights}") damage_model = YOLO(str(args.damage_weights)) sev_cnn = None if args.severity_weights.exists(): print(f"[+] Severity model: {args.severity_weights}") sev_cnn = SeverityCNN(args.severity_weights) print(f" Classes: {sev_cnn.classes} → {sev_cnn.tr_names}") if sev_cnn.train_val_acc is not None: print(f" Egitim val_acc: {sev_cnn.train_val_acc:.3f}") else: print(f"[!] Severity agirligi yok: {args.severity_weights} (atlandi)") if args.image: images = [args.image] else: all_imgs = sorted(args.val_dir.glob("*.jpg")) if not all_imgs: raise FileNotFoundError(f"Val'da gorsel yok: {args.val_dir}") random.seed(args.seed) images = random.sample(all_imgs, min(args.num, len(all_imgs))) print(f"\n[+] {len(images)} gorsel test ediliyor...") print(f" out_dir: {args.out_dir}\n") for img in images: print(f"--- {img.name} ---") res = run_one(damage_model, sev_cnn, img, args.out_dir, conf=args.conf, imgsz=args.imgsz) s = res["summary"] print(f" damage_count: {s['damage_count']}, " f"classes: {s['classes_detected']}, " f"max_conf: {s['max_detection_conf']}") for d in res["damages"]: cnn = d["severity_cnn"] cnn_str = f"{cnn['level']} ({cnn['confidence']:.2f})" if cnn else "n/a" print(f" [{d['id']}] {d['type']} conf={d['detection_confidence']:.2f} " f"area={d['area_ratio']:.4f} " f"rule={d['severity_rule']['level']} cnn={cnn_str}") print() print(f"[OK] Tum sonuclar: {args.out_dir}") if __name__ == "__main__": main()