| | |
| | |
| | |
| | import os |
| | import cv2 |
| | import numpy as np |
| | import torch |
| | from ultralytics import YOLO |
| |
|
| | |
| | AW_MODEL_PATH = os.path.join(os.path.dirname(__file__), "models", "AW_yolo.pt") |
| | CERVIX_MODEL_PATH = os.path.join(os.path.dirname(__file__), "models", "cervix_yolo.pt") |
| | aw_model = None |
| | cervix_model = None |
| |
|
| | def load_aw_model(): |
| | """Lazy load the Acetowhite model on first use""" |
| | global aw_model |
| | if aw_model is not None: |
| | return aw_model |
| | |
| | try: |
| | print(f"π Loading Acetowhite model from: {AW_MODEL_PATH}") |
| | aw_model = YOLO(AW_MODEL_PATH) |
| | aw_model.to('cpu') |
| | |
| | |
| | if hasattr(aw_model.model, 'model'): |
| | for module in aw_model.model.modules(): |
| | if module.__class__.__name__ == 'Segment': |
| | print("β οΈ Patching Segment head to prevent detect() error") |
| | |
| | module.detect = lambda self, x: x |
| | |
| | print("β
Acetowhite model loaded successfully") |
| | return aw_model |
| | except Exception as e: |
| | print(f"β Error loading Acetowhite model: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return None |
| |
|
| | def load_cervix_model(): |
| | """Lazy load the Cervix model on first use""" |
| | global cervix_model |
| | if cervix_model is not None: |
| | return cervix_model |
| | |
| | try: |
| | print(f"π Loading Cervix model from: {CERVIX_MODEL_PATH}") |
| | cervix_model = YOLO(CERVIX_MODEL_PATH) |
| | cervix_model.to('cpu') |
| | print("β
Cervix model loaded successfully") |
| | return cervix_model |
| | except Exception as e: |
| | print(f"β Error loading Cervix model: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return None |
| |
|
| | |
| | MIN_AREA = 150 |
| | SMOOTHING_EPSILON = 0.002 |
| | DEFAULT_CONF = 0.4 |
| | IMG_SIZE = 640 |
| |
|
| | |
| | def infer_aw_contour(frame, conf_threshold=DEFAULT_CONF): |
| |
|
| | if frame is None: |
| | return { |
| | "overlay": None, |
| | "contours": [], |
| | "detections": 0, |
| | "frame_width": 0, |
| | "frame_height": 0 |
| | } |
| |
|
| | model = load_aw_model() |
| | if model is None: |
| | print("β Acetowhite model not available") |
| | return { |
| | "overlay": None, |
| | "contours": [], |
| | "detections": 0, |
| | "frame_width": frame.shape[1], |
| | "frame_height": frame.shape[0] |
| | } |
| |
|
| | overlay = frame.copy() |
| | contours_list = [] |
| | detection_count = 0 |
| |
|
| | try: |
| | print(f"π Running YOLO prediction on frame shape: {frame.shape}") |
| | results = model.predict( |
| | frame, |
| | conf=conf_threshold, |
| | imgsz=IMG_SIZE, |
| | verbose=False, |
| | device='cpu' |
| | ) |
| | |
| | |
| | if isinstance(results, (list, tuple)): |
| | result = results[0] |
| | else: |
| | result = results |
| | |
| | print(f"β
YOLO prediction complete") |
| | |
| | |
| | if hasattr(result, 'masks') and result.masks is not None: |
| | try: |
| | masks = result.masks.xy |
| | if len(masks) > 0: |
| | print(f"β
Found {len(masks)} masks") |
| | for idx, polygon in enumerate(masks): |
| | confidence = float(result.boxes.conf[idx]) |
| | if confidence < conf_threshold: |
| | continue |
| |
|
| | contour = polygon.astype(np.int32) |
| | area = cv2.contourArea(contour) |
| |
|
| | if area < MIN_AREA: |
| | continue |
| |
|
| | epsilon = SMOOTHING_EPSILON * cv2.arcLength(contour, True) |
| | contour = cv2.approxPolyDP(contour, epsilon, True) |
| |
|
| | cv2.polylines(overlay, [contour], isClosed=True, color=(0, 255, 0), thickness=2) |
| |
|
| | contours_list.append({ |
| | "points": contour.tolist(), |
| | "area": float(area), |
| | "confidence": round(confidence, 3) |
| | }) |
| | detection_count += 1 |
| | except Exception as mask_err: |
| | print(f"β οΈ Could not extract masks: {mask_err}") |
| | |
| | except Exception as e: |
| | print(f"β YOLO prediction error: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | |
| | |
| | return { |
| | "overlay": overlay if detection_count > 0 else None, |
| | "contours": contours_list, |
| | "detections": detection_count, |
| | "frame_width": frame.shape[1], |
| | "frame_height": frame.shape[0] |
| | } |
| |
|
| | |
| | |
| | |
| | import cv2 |
| | import numpy as np |
| | from ultralytics import YOLO |
| | from collections import deque |
| |
|
| | |
| | detect_history = deque(maxlen=10) |
| |
|
| | |
| | def compute_focus(gray_roi): |
| | focus = cv2.Laplacian(gray_roi, cv2.CV_64F).var() |
| | return min(focus / 200, 1.0) |
| |
|
| |
|
| | def compute_exposure(gray_roi): |
| | exposure = np.mean(gray_roi) |
| | if 70 <= exposure <= 180: |
| | return 1.0 |
| | return max(0, 1 - abs(exposure - 125) / 125) |
| |
|
| |
|
| | def compute_glare(gray_roi): |
| | _, thresh = cv2.threshold(gray_roi, 240, 255, cv2.THRESH_BINARY) |
| | glare_ratio = np.sum(thresh == 255) / gray_roi.size |
| |
|
| | if glare_ratio < 0.01: |
| | return 1.0 |
| | elif glare_ratio < 0.03: |
| | return 0.7 |
| | elif glare_ratio < 0.06: |
| | return 0.4 |
| | else: |
| | return 0.1 |
| |
|
| | |
| |
|
| | def analyze_frame(frame, conf_threshold=0.3): |
| |
|
| | if frame is None: |
| | return { |
| | "detected": False, |
| | "detection_confidence": 0.0, |
| | "quality_score": 0.0, |
| | "quality_percent": 0 |
| | } |
| |
|
| | model = load_cervix_model() |
| | if model is None: |
| | print("β Cervix model not loaded") |
| | return { |
| | "detected": False, |
| | "detection_confidence": 0.0, |
| | "quality_score": 0.0, |
| | "quality_percent": 0 |
| | } |
| |
|
| | gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| |
|
| | try: |
| | results = model.predict( |
| | frame, |
| | conf=conf_threshold, |
| | imgsz=640, |
| | verbose=False, |
| | device='cpu' |
| | ) |
| | except Exception as e: |
| | print(f"β Cervix model prediction error: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return { |
| | "detected": False, |
| | "detection_confidence": 0.0, |
| | "quality_score": 0.0, |
| | "quality_percent": 0 |
| | } |
| |
|
| | r = results[0] |
| |
|
| | if r.boxes is None or len(r.boxes) == 0: |
| | return { |
| | "detected": False, |
| | "detection_confidence": 0.0, |
| | "quality_score": 0.0, |
| | "quality_percent": 0 |
| | } |
| |
|
| | |
| | box = r.boxes.xyxy.cpu().numpy()[0].astype(int) |
| | detection_conf = float(r.boxes.conf.cpu().numpy()[0]) |
| |
|
| | x1, y1, x2, y2 = box |
| | roi = gray[y1:y2, x1:x2] |
| |
|
| | if roi.size == 0: |
| | return { |
| | "detected": False, |
| | "detection_confidence": detection_conf, |
| | "quality_score": 0.0, |
| | "quality_percent": 0 |
| | } |
| |
|
| | |
| | focus_n = compute_focus(roi) |
| | exposure_n = compute_exposure(roi) |
| | glare_n = compute_glare(roi) |
| |
|
| | quality_score = ( |
| | 0.35 * focus_n + |
| | 0.30 * exposure_n + |
| | 0.35 * glare_n |
| | ) |
| |
|
| | return { |
| | "detected": True, |
| | "detection_confidence": round(detection_conf, 3), |
| | "quality_score": round(float(quality_score), 3), |
| | "quality_percent": int(quality_score * 100), |
| | "focus_score": round(float(focus_n), 3), |
| | "exposure_score": round(float(exposure_n), 3), |
| | "glare_score": round(float(glare_n), 3) |
| | } |
| |
|
| | |
| |
|
| | def analyze_video_frame(frame, conf_threshold=0.3): |
| | result = analyze_frame(frame, conf_threshold) |
| | detect_history.append(1 if result["detected"] else 0) |
| | stable_count = sum(detect_history) |
| |
|
| | if stable_count >= 7: |
| | result["status"] = "Cervix Detected (Stable)" |
| | elif stable_count > 0: |
| | result["status"] = "Cervix Detected (Unstable)" |
| | else: |
| | result["status"] = "Searching Cervix" |
| |
|
| | return result |
| |
|
| | |
| | |
| | |
| |
|
| | def infer_cervix_bbox(frame, conf_threshold=0.4): |
| | """ |
| | Detect cervix bounding boxes using YOLO model. |
| | Returns bounding boxes and annotated frame. |
| | |
| | Args: |
| | frame: Input image frame (BGR) |
| | conf_threshold: Confidence threshold for detection |
| | |
| | Returns: |
| | Dictionary with annotated overlay and bounding boxes |
| | """ |
| | |
| | if frame is None: |
| | return { |
| | "overlay": None, |
| | "bounding_boxes": [], |
| | "detections": 0, |
| | "frame_width": 0, |
| | "frame_height": 0 |
| | } |
| | |
| | model = load_cervix_model() |
| | if model is None: |
| | return { |
| | "overlay": None, |
| | "bounding_boxes": [], |
| | "detections": 0, |
| | "frame_width": 0, |
| | "frame_height": 0 |
| | } |
| | |
| | try: |
| | results = model.predict( |
| | frame, |
| | conf=conf_threshold, |
| | imgsz=640, |
| | verbose=False, |
| | device='cpu' |
| | )[0] |
| | |
| | overlay = frame.copy() |
| | bounding_boxes = [] |
| | detection_count = 0 |
| | |
| | if results.boxes is not None and len(results.boxes) > 0: |
| | boxes = results.boxes.xyxy.cpu().numpy() |
| | confidences = results.boxes.conf.cpu().numpy() |
| | |
| | for idx, box in enumerate(boxes): |
| | x1, y1, x2, y2 = box.astype(int) |
| | confidence = float(confidences[idx]) |
| | |
| | |
| | cv2.rectangle( |
| | overlay, |
| | (x1, y1), |
| | (x2, y2), |
| | (255, 0, 0), |
| | 3 |
| | ) |
| | |
| | |
| | bounding_boxes.append({ |
| | "x1": int(x1), |
| | "y1": int(y1), |
| | "x2": int(x2), |
| | "y2": int(y2), |
| | "width": int(x2 - x1), |
| | "height": int(y2 - y1), |
| | "confidence": round(confidence, 3), |
| | "center_x": int((x1 + x2) / 2), |
| | "center_y": int((y1 + y2) / 2) |
| | }) |
| | |
| | detection_count += 1 |
| | |
| | return { |
| | "overlay": overlay if detection_count > 0 else None, |
| | "bounding_boxes": bounding_boxes, |
| | "detections": detection_count, |
| | "frame_width": frame.shape[1], |
| | "frame_height": frame.shape[0] |
| | } |
| | |
| | except Exception as e: |
| | print(f"β Cervix bounding box detection error: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return { |
| | "overlay": None, |
| | "bounding_boxes": [], |
| | "detections": 0, |
| | "frame_width": frame.shape[1], |
| | "frame_height": frame.shape[0] |
| | } |
| |
|
| |
|