import cv2 import json import uuid import os import logging from ultralytics import YOLO from tqdm import tqdm from storage import StorageInterface import numpy as np from typing import Tuple, List, Dict, Any # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Constants MODEL_PATHS = { "model1": "models/Intui_SDM_41.pt", "model2": "models/Intui_SDM_20.pt" # Add your second model path here } MAX_DIMENSION = 1280 CONFIDENCE_THRESHOLDS = [0.1, 0.3, 0.5, 0.7, 0.9] TEXT_COLOR = (0, 0, 255) # Red color for text BOX_COLOR = (255, 0, 0) # Red color for box (no transparency) BG_COLOR = (255, 255, 255, 0.6) # Semi-transparent white for text background THICKNESS = 1 # Thin text thickness BOX_THICKNESS = 2 # Box line thickness MIN_FONT_SCALE = 0.2 # Minimum font scale MAX_FONT_SCALE = 1.0 # Maximum font scale TEXT_PADDING = 20 # Increased padding between text elements OVERLAP_THRESHOLD = 0.3 # Threshold for detecting text overlap def preprocess_image_for_symbol_detection(image_cv: np.ndarray) -> np.ndarray: """Preprocess the image for symbol detection.""" gray = cv2.cvtColor(image_cv, cv2.COLOR_BGR2GRAY) equalized = cv2.equalizeHist(gray) filtered = cv2.bilateralFilter(equalized, 9, 75, 75) edges = cv2.Canny(filtered, 100, 200) preprocessed_image = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) return preprocessed_image def evaluate_detections(detections_list: List[Dict[str, Any]]) -> int: """Evaluate the quality of detections.""" return len(detections_list) def resize_image_with_aspect_ratio(image_cv: np.ndarray, max_dimension: int) -> Tuple[np.ndarray, int, int]: """Resize the image while maintaining the aspect ratio.""" original_height, original_width = image_cv.shape[:2] if max(original_width, original_height) > max_dimension: scale = max_dimension / float(max(original_width, original_height)) new_width = int(original_width * scale) new_height = int(original_height * scale) image_cv = cv2.resize(image_cv, (new_width, new_height), interpolation=cv2.INTER_LINEAR) else: new_width, new_height = original_width, original_height return image_cv, new_width, new_height def merge_detections(all_detections: List[Dict]) -> List[Dict]: """ Merge detections from all models, keeping only the highest confidence detection when duplicates are found using IoU. """ if not all_detections: return [] # Sort by confidence all_detections.sort(key=lambda x: x['confidence'], reverse=True) # Keep track of which detections to keep keep = [True] * len(all_detections) def calculate_iou(box1, box2): """Calculate Intersection over Union (IoU) between two boxes.""" x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) intersection = max(0, x2 - x1) * max(0, y2 - y1) area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = area1 + area2 - intersection return intersection / union if union > 0 else 0 # Apply NMS and keep only highest confidence detection for i in range(len(all_detections)): if not keep[i]: continue current_box = all_detections[i]['bbox'] current_label = all_detections[i]['original_label'] for j in range(i + 1, len(all_detections)): if not keep[j]: continue # Check if same label type and high IoU if (all_detections[j]['original_label'] == current_label and calculate_iou(current_box, all_detections[j]['bbox']) > 0.5): # Since list is sorted by confidence, i will always have higher confidence than j keep[j] = False logging.info(f"Removing duplicate detection of {current_label} with lower confidence " f"({all_detections[j]['confidence']:.2f} < {all_detections[i]['confidence']:.2f})") # Add kept detections to final list merged_detections = [det for i, det in enumerate(all_detections) if keep[i]] return merged_detections def calculate_font_scale(image_width: int, bbox_width: int) -> float: """ Calculate appropriate font scale based on image and bbox dimensions. """ base_scale = 0.7 # Increased base scale for better visibility # Adjust font size based on image width and bbox width width_ratio = image_width / MAX_DIMENSION bbox_ratio = bbox_width / image_width # Calculate adaptive scale with increased multipliers adaptive_scale = base_scale * max(width_ratio, 0.5) * max(bbox_ratio * 6, 0.7) # Ensure font scale stays within reasonable bounds return min(max(adaptive_scale, MIN_FONT_SCALE), MAX_FONT_SCALE) def check_overlap(rect1, rect2): """Check if two rectangles overlap.""" x1_1, y1_1, x2_1, y2_1 = rect1 x1_2, y1_2, x2_2, y2_2 = rect2 return not (x2_1 < x1_2 or x1_1 > x2_2 or y2_1 < y1_2 or y1_1 > y2_2) def draw_annotation( image: np.ndarray, bbox: List[int], text: str, confidence: float, model_source: str, existing_annotations: List[tuple] = None ) -> None: """ Draw annotation with no background and thin fonts. """ if existing_annotations is None: existing_annotations = [] x1, y1, x2, y2 = bbox bbox_width = x2 - x1 image_width = image.shape[1] image_height = image.shape[0] # Calculate adaptive font scale font_scale = calculate_font_scale(image_width, bbox_width) # Simplify the annotation text annotation_text = f'{text}\n{confidence:.0f}%' lines = annotation_text.split('\n') # Calculate text dimensions font = cv2.FONT_HERSHEY_SIMPLEX max_width = 0 total_height = 0 line_heights = [] for line in lines: (width, height), baseline = cv2.getTextSize( line, font, font_scale, THICKNESS ) max_width = max(max_width, width) line_height = height + baseline + TEXT_PADDING line_heights.append(line_height) total_height += line_height # Calculate initial text position with increased padding padding = TEXT_PADDING rect_x1 = max(0, x1 - padding) rect_x2 = min(image_width, x1 + max_width + padding * 2) # Try different positions to avoid overlap positions = [ ('top', y1 - total_height - padding), ('bottom', y2 + padding), ('top_shifted', y1 - total_height - padding * 2), ('bottom_shifted', y2 + padding * 2) ] final_position = None for pos_name, y_pos in positions: if y_pos < 0 or y_pos + total_height > image_height: continue rect = (rect_x1, y_pos, rect_x2, y_pos + total_height) overlap = False for existing_rect in existing_annotations: if check_overlap(rect, existing_rect): overlap = True break if not overlap: final_position = (pos_name, y_pos) existing_annotations.append(rect) break # If no non-overlapping position found, use side position if final_position is None: rect_x1 = max(0, x1 + bbox_width + padding) rect_x2 = min(image_width, rect_x1 + max_width + padding * 2) y_pos = y1 final_position = ('side', y_pos) rect_y1 = final_position[1] # Draw bounding box (no transparency) cv2.rectangle(image, (x1, y1), (x2, y2), BOX_COLOR, BOX_THICKNESS) # Draw text directly without background text_y = rect_y1 + line_heights[0] - padding for i, line in enumerate(lines): # Draw text with thin lines cv2.putText( image, line, (rect_x1 + padding, text_y + sum(line_heights[:i])), font, font_scale, TEXT_COLOR, THICKNESS, cv2.LINE_AA ) def run_detection_with_optimal_threshold( image_path: str, results_dir: str = "results", file_name: str = "", apply_preprocessing: bool = False, resize_image: bool = True, # Changed default to True storage: StorageInterface = None ) -> Tuple[str, str, str, List[int]]: """Run detection with multiple models and merge results.""" try: image_data = storage.load_file(image_path) nparr = np.frombuffer(image_data, np.uint8) original_image_cv = cv2.imdecode(nparr, cv2.IMREAD_COLOR) image_cv = original_image_cv.copy() if resize_image: logging.info("Resizing image for detection with aspect ratio...") image_cv, resized_width, resized_height = resize_image_with_aspect_ratio(image_cv, MAX_DIMENSION) else: logging.info("Skipping image resizing...") resized_height, resized_width = original_image_cv.shape[:2] if apply_preprocessing: logging.info("Preprocessing image for symbol detection...") image_cv = preprocess_image_for_symbol_detection(image_cv) else: logging.info("Skipping image preprocessing for symbol detection...") all_detections = [] # Run detection with each model for model_name, model_path in MODEL_PATHS.items(): logging.info(f"Running detection with model: {model_name}") if not model_path: logging.warning(f"No model path found for {model_name}") continue model = YOLO(model_path) best_confidence_threshold = 0.5 best_detections_list = [] best_metric = -1 for confidence_threshold in CONFIDENCE_THRESHOLDS: logging.info(f"Running detection with confidence threshold: {confidence_threshold}...") results = model.predict(source=image_cv, imgsz=MAX_DIMENSION) detections_list = [] for result in results: for box in result.boxes: confidence = float(box.conf[0]) if confidence >= confidence_threshold: x1, y1, x2, y2 = map(float, box.xyxy[0]) class_id = int(box.cls[0]) label = result.names[class_id] scale_x = original_image_cv.shape[1] / resized_width scale_y = original_image_cv.shape[0] / resized_height x1 *= scale_x x2 *= scale_x y1 *= scale_y y2 *= scale_y x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) split_label = label.split('_') if len(split_label) >= 3: category = split_label[0] type_ = split_label[1] new_label = '_'.join(split_label[2:]) elif len(split_label) == 2: category = split_label[0] type_ = split_label[1] new_label = split_label[1] elif len(split_label) == 1: category = split_label[0] type_ = "Unknown" new_label = split_label[0] else: logging.warning(f"Unexpected label format: {label}. Skipping this detection.") continue detection_id = str(uuid.uuid4()) detection_info = { "symbol_id": detection_id, "class_id": class_id, "original_label": label, "category": category, "type": type_, "label": new_label, "confidence": confidence, "bbox": [x1, y1, x2, y2], "model_source": model_name } detections_list.append(detection_info) metric = evaluate_detections(detections_list) if metric > best_metric: best_metric = metric best_confidence_threshold = confidence_threshold best_detections_list = detections_list all_detections.extend(best_detections_list) # Merge detections from both models merged_detections = merge_detections(all_detections) logging.info(f"Total detections after merging: {len(merged_detections)}") # Draw annotations on the image existing_annotations = [] for det in merged_detections: draw_annotation( original_image_cv, det["bbox"], det["original_label"], det["confidence"] * 100, det["model_source"], existing_annotations ) # Save results storage.create_directory(results_dir) file_name_without_extension = os.path.splitext(file_name)[0] # Prepare output JSON total_detected_symbols = len(merged_detections) class_counts = {} for det in merged_detections: full_label = det["original_label"] class_counts[full_label] = class_counts.get(full_label, 0) + 1 output_json = { "total_detected_symbols": total_detected_symbols, "details": class_counts, "detections": merged_detections } # Save JSON and image detection_json_path = os.path.join( results_dir, f'{file_name_without_extension}_detected_symbols.json' ) storage.save_file( detection_json_path, json.dumps(output_json, indent=4).encode('utf-8') ) # Save with maximum quality detection_image_path = os.path.join( results_dir, f'{file_name_without_extension}_detected_symbols.png' # Using PNG for transparency ) # Configure image encoding parameters for maximum quality encode_params = [ cv2.IMWRITE_PNG_COMPRESSION, 0 # No compression for PNG ] # Save as high-quality PNG to preserve transparency _, img_encoded = cv2.imencode( '.png', original_image_cv, encode_params ) storage.save_file(detection_image_path, img_encoded.tobytes()) # Calculate diagram bbox from merged detections diagram_bbox = [ min([det['bbox'][0] for det in merged_detections], default=0), min([det['bbox'][1] for det in merged_detections], default=0), max([det['bbox'][2] for det in merged_detections], default=0), max([det['bbox'][3] for det in merged_detections], default=0) ] # Scale up image if it's too small min_width = 2000 # Minimum width for good visibility if original_image_cv.shape[1] < min_width: scale_factor = min_width / original_image_cv.shape[1] new_width = min_width new_height = int(original_image_cv.shape[0] * scale_factor) original_image_cv = cv2.resize( original_image_cv, (new_width, new_height), interpolation=cv2.INTER_CUBIC ) return ( detection_image_path, detection_json_path, f"Total detections after merging: {total_detected_symbols}", diagram_bbox ) except Exception as e: logging.error(f"An error occurred: {e}") return "Error during detection", None, None, None if __name__ == "__main__": from storage import StorageFactory uploaded_file_path = "processed_pages/10219-1-DG-BC-00011.01-REV_A_page_1_text.png" results_dir = "results" apply_symbol_preprocessing = False resize_image = True storage = StorageFactory.get_storage() ( detection_image_path, detection_json_path, detection_log_message, diagram_bbox ) = run_detection_with_optimal_threshold( uploaded_file_path, results_dir=results_dir, file_name=os.path.basename(uploaded_file_path), apply_preprocessing=apply_symbol_preprocessing, resize_image=resize_image, storage=storage ) logging.info("Detection Image Path: %s", detection_image_path) logging.info("Detection JSON Path: %s", detection_json_path) logging.info("Detection Log Message: %s", detection_log_message) logging.info("Diagram BBox: %s", diagram_bbox) logging.info("Done!")