import torch from abc import ABC, abstractmethod from typing import List, Optional, Dict import numpy as np import cv2 from pathlib import Path from loguru import logger import json from common import DetectionResult from storage import StorageInterface from utils import DebugHandler, CoordinateTransformer class BaseConfig(ABC): """Abstract Base Config for all configuration classes.""" def __post_init__(self): """Ensures default values are set correctly for all configs.""" pass class BaseDetector(ABC): """Abstract base class for detection models.""" def __init__(self, config: BaseConfig, debug_handler: DebugHandler = None): self.config = config self.debug_handler = debug_handler or DebugHandler() @abstractmethod def _load_model(self, model_path: str): """Load and return the detection model.""" pass @abstractmethod def detect(self, image: np.ndarray, *args, **kwargs): """Run detection on an input image.""" pass @abstractmethod def _preprocess(self, image: np.ndarray) -> np.ndarray: """Preprocess the input image before detection.""" pass @abstractmethod def _postprocess(self, image: np.ndarray) -> np.ndarray: """Postprocess the input image before detection.""" pass class BaseDetectionPipeline(ABC): """Abstract base class for detection pipelines.""" def __init__( self, storage: StorageInterface, debug_handler=None ): # self.detector = detector self.storage = storage self.debug_handler = debug_handler or DebugHandler() self.transformer = CoordinateTransformer() @abstractmethod def process_image( self, image_path: str, output_dir: str, config ) -> DetectionResult: """Main processing pipeline for a single image.""" pass def _apply_roi(self, image: np.ndarray, roi: np.ndarray) -> np.ndarray: """Apply region of interest cropping.""" if roi is not None and len(roi) == 4: x_min, y_min, x_max, y_max = roi return image[y_min:y_max, x_min:x_max] return image def _adjust_coordinates(self, detections: List[Dict], roi: np.ndarray) -> List[Dict]: """Adjust detection coordinates based on ROI""" if roi is None or len(roi) != 4: return detections x_offset, y_offset = roi[0], roi[1] adjusted = [] for det in detections: try: adjusted_bbox = [ int(det["bbox"][0] + x_offset), int(det["bbox"][1] + y_offset), int(det["bbox"][2] + x_offset), int(det["bbox"][3] + y_offset) ] adjusted_det = {**det, "bbox": adjusted_bbox} adjusted.append(adjusted_det) except KeyError: logger.warning("Invalid detection format during coordinate adjustment") return adjusted def _persist_results( self, output_dir: str, image_path: str, detections: List[Dict], annotated_image: Optional[np.ndarray] ) -> Dict[str, str]: """Save detection results and annotations""" self.storage.create_directory(output_dir) base_name = Path(image_path).stem # Save JSON results json_path = Path(output_dir) / f"{base_name}_lines.json" self.storage.save_file( str(json_path), json.dumps({ "solid_lines": {"lines": detections}, "dashed_lines": {"lines": []} }, indent=2).encode('utf-8') ) # Save annotated image img_path = None if annotated_image is not None: img_path = Path(output_dir) / f"{base_name}_annotated.jpg" _, img_data = cv2.imencode('.jpg', annotated_image) self.storage.save_file(str(img_path), img_data.tobytes()) return { "json_path": str(json_path), "image_path": str(img_path) if img_path else None }