import os import json import io from PIL import Image, ImageDraw, ImageFont import numpy as np from doctr.models import ocr_predictor import pytesseract import easyocr from storage import StorageInterface import re import logging from pathlib import Path import cv2 import traceback # Initialize models try: doctr_model = ocr_predictor(pretrained=True) easyocr_reader = easyocr.Reader(['en']) logging.info("All OCR models loaded successfully") except Exception as e: logging.error(f"Error loading OCR models: {e}") # Combined patterns from all approaches TEXT_PATTERNS = { 'Line_Number': r"(?:\d{1,5}[-](?:[A-Z]{2,4})[-]\d{1,3})", 'Equipment_Tag': r"(?:[A-Z]{1,3}[-][A-Z0-9]{1,4}[-]\d{1,3})", 'Instrument_Tag': r"(?:\d{2,3}[-][A-Z]{2,4}[-]\d{2,3})", 'Valve_Number': r"(?:[A-Z]{1,2}[-]\d{3})", 'Pipe_Size': r"(?:\d{1,2}[\"])", 'Flow_Direction': r"(?:FROM|TO)", 'Service_Description': r"(?:STEAM|WATER|AIR|GAS|DRAIN)", 'Process_Instrument': r"(?:[0-9]{2,3}(?:-[A-Z]{2,3})?-[0-9]{2,3}|[A-Z]{2,3}-[0-9]{2,3})", 'Nozzle': r"(?:N[0-9]{1,2}|MH)", 'Pipe_Connector': r"(?:[0-9]{1,5}|[A-Z]{1,2}[0-9]{2,5})" } def detect_text_combined(image, confidence_threshold=0.3): """Combine results from all three OCR approaches""" results = [] # 1. Tesseract Detection tesseract_results = detect_with_tesseract(image) for result in tesseract_results: result['source'] = 'tesseract' results.append(result) # 2. EasyOCR Detection easyocr_results = detect_with_easyocr(image) for result in easyocr_results: result['source'] = 'easyocr' results.append(result) # 3. DocTR Detection doctr_results = detect_with_doctr(image) for result in doctr_results: result['source'] = 'doctr' results.append(result) # Merge overlapping detections merged_results = merge_overlapping_detections(results) # Classify and filter results classified_results = [] for result in merged_results: if result['confidence'] >= confidence_threshold: text_type = classify_text(result['text']) result['text_type'] = text_type classified_results.append(result) return classified_results def generate_detailed_summary(results): """Generate detailed detection summary""" summary = { 'total_detections': len(results), 'by_type': {}, 'by_source': { 'tesseract': { 'count': 0, 'by_type': {}, 'avg_confidence': 0.0 }, 'easyocr': { 'count': 0, 'by_type': {}, 'avg_confidence': 0.0 }, 'doctr': { 'count': 0, 'by_type': {}, 'avg_confidence': 0.0 } }, 'confidence_ranges': { '0.9-1.0': 0, '0.8-0.9': 0, '0.7-0.8': 0, '0.6-0.7': 0, '0.5-0.6': 0, '<0.5': 0 }, 'detected_items': [] } # Initialize type counters for pattern_type in TEXT_PATTERNS.keys(): summary['by_type'][pattern_type] = { 'count': 0, 'avg_confidence': 0.0, 'by_source': { 'tesseract': 0, 'easyocr': 0, 'doctr': 0 }, 'items': [] } # Initialize source-specific type counters for source in summary['by_source'].keys(): summary['by_source'][source]['by_type'][pattern_type] = 0 # Process each detection source_confidences = {'tesseract': [], 'easyocr': [], 'doctr': []} for result in results: # Get source and confidence source = result['source'] conf = result['confidence'] text_type = result['text_type'] # Update source statistics summary['by_source'][source]['count'] += 1 source_confidences[source].append(conf) # Update confidence ranges if conf >= 0.9: summary['confidence_ranges']['0.9-1.0'] += 1 elif conf >= 0.8: summary['confidence_ranges']['0.8-0.9'] += 1 elif conf >= 0.7: summary['confidence_ranges']['0.7-0.8'] += 1 elif conf >= 0.6: summary['confidence_ranges']['0.6-0.7'] += 1 elif conf >= 0.5: summary['confidence_ranges']['0.5-0.6'] += 1 else: summary['confidence_ranges']['<0.5'] += 1 # Update type statistics if text_type in summary['by_type']: type_stats = summary['by_type'][text_type] type_stats['count'] += 1 type_stats['by_source'][source] += 1 summary['by_source'][source]['by_type'][text_type] += 1 type_stats['items'].append({ 'text': result['text'], 'confidence': conf, 'source': source, 'bbox': result['bbox'] }) # Add to detected items summary['detected_items'].append({ 'text': result['text'], 'type': text_type, 'confidence': conf, 'source': source, 'bbox': result['bbox'] }) # Calculate average confidences for source, confs in source_confidences.items(): if confs: summary['by_source'][source]['avg_confidence'] = sum(confs) / len(confs) # Calculate average confidences for each type for text_type, stats in summary['by_type'].items(): if stats['items']: stats['avg_confidence'] = sum(item['confidence'] for item in stats['items']) / len(stats['items']) return summary def process_drawing(image_path, results_dir, storage=None): try: # Read image using cv2 image = cv2.imread(image_path) if image is None: raise ValueError(f"Could not read image from {image_path}") # Create annotated copy annotated_image = image.copy() # Initialize results and summary text_results = { 'file_name': image_path, 'detections': [] } text_summary = { 'total_detections': 0, 'by_source': { 'tesseract': {'count': 0, 'avg_confidence': 0.0}, 'easyocr': {'count': 0, 'avg_confidence': 0.0}, 'doctr': {'count': 0, 'avg_confidence': 0.0} }, 'by_type': { 'equipment_tag': {'count': 0, 'avg_confidence': 0.0}, 'line_number': {'count': 0, 'avg_confidence': 0.0}, 'instrument_tag': {'count': 0, 'avg_confidence': 0.0}, 'valve_number': {'count': 0, 'avg_confidence': 0.0}, 'pipe_size': {'count': 0, 'avg_confidence': 0.0}, 'flow_direction': {'count': 0, 'avg_confidence': 0.0}, 'service_description': {'count': 0, 'avg_confidence': 0.0}, 'process_instrument': {'count': 0, 'avg_confidence': 0.0}, 'nozzle': {'count': 0, 'avg_confidence': 0.0}, 'pipe_connector': {'count': 0, 'avg_confidence': 0.0}, 'other': {'count': 0, 'avg_confidence': 0.0} } } # Run OCR with different engines tesseract_results = detect_with_tesseract(image) easyocr_results = detect_with_easyocr(image) doctr_results = detect_with_doctr(image) # Combine results all_detections = [] all_detections.extend([(res, 'tesseract') for res in tesseract_results]) all_detections.extend([(res, 'easyocr') for res in easyocr_results]) all_detections.extend([(res, 'doctr') for res in doctr_results]) # Process each detection for detection, source in all_detections: # Update text_results text_results['detections'].append({ 'text': detection['text'], 'bbox': detection['bbox'], 'confidence': detection['confidence'], 'source': source }) # Update summary statistics text_summary['total_detections'] += 1 text_summary['by_source'][source]['count'] += 1 text_summary['by_source'][source]['avg_confidence'] += detection['confidence'] # Draw detection on image x1, y1, x2, y2 = detection['bbox'] cv2.rectangle(annotated_image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) cv2.putText(annotated_image, detection['text'], (int(x1), int(y1)-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) # Calculate average confidences for source in text_summary['by_source']: if text_summary['by_source'][source]['count'] > 0: text_summary['by_source'][source]['avg_confidence'] /= text_summary['by_source'][source]['count'] # Save results with new naming convention base_name = Path(image_path).stem text_result_image_path = os.path.join(results_dir, f"{base_name}_detected_texts.jpg") text_result_json_path = os.path.join(results_dir, f"{base_name}_detected_texts.json") # Save the annotated image success = cv2.imwrite(text_result_image_path, annotated_image) if not success: raise ValueError(f"Failed to save image to {text_result_image_path}") # Save the JSON results with open(text_result_json_path, 'w', encoding='utf-8') as f: json.dump({ 'file_name': image_path, 'summary': text_summary, 'detections': text_results['detections'] }, f, indent=4, ensure_ascii=False) return { 'image_path': text_result_image_path, 'json_path': text_result_json_path, 'results': text_results }, text_summary except Exception as e: print(f"Error in process_drawing: {str(e)}") traceback.print_exc() return None, None def detect_with_tesseract(image): """Detect text using Tesseract OCR""" # Configure Tesseract for technical drawings custom_config = r'--oem 3 --psm 11 -c tessedit_char_whitelist="ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-.()" -c tessedit_write_images=true -c textord_heavy_nr=true -c textord_min_linesize=3' try: data = pytesseract.image_to_data( image, config=custom_config, output_type=pytesseract.Output.DICT ) results = [] for i in range(len(data['text'])): conf = float(data['conf'][i]) if conf > 30: # Lower confidence threshold for technical text text = data['text'][i].strip() if text: x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i] results.append({ 'text': text, 'bbox': [x, y, x + w, y + h], 'confidence': conf / 100.0 }) return results except Exception as e: logger.error(f"Tesseract error: {str(e)}") return [] def detect_with_easyocr(image): """Detect text using EasyOCR""" if easyocr_reader is None: return [] try: results = easyocr_reader.readtext( np.array(image), paragraph=False, height_ths=2.0, width_ths=2.0, contrast_ths=0.2, text_threshold=0.5 ) parsed_results = [] for bbox, text, conf in results: x1, y1 = min(point[0] for point in bbox), min(point[1] for point in bbox) x2, y2 = max(point[0] for point in bbox), max(point[1] for point in bbox) parsed_results.append({ 'text': text, 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'confidence': conf }) return parsed_results except Exception as e: logger.error(f"EasyOCR error: {str(e)}") return [] def detect_with_doctr(image): """Detect text using DocTR""" try: # Convert PIL image to numpy array image_np = np.array(image) # Get predictions result = doctr_model([image_np]) doc = result.export() # Parse results results = [] for page in doc['pages']: for block in page['blocks']: for line in block['lines']: for word in line['words']: # Convert normalized coordinates to absolute height, width = image_np.shape[:2] points = np.array(word['geometry']) * np.array([width, height]) x1, y1 = points.min(axis=0) x2, y2 = points.max(axis=0) results.append({ 'text': word['value'], 'bbox': [int(x1), int(y1), int(x2), int(y2)], 'confidence': word.get('confidence', 0.5) }) return results except Exception as e: logger.error(f"DocTR error: {str(e)}") return [] def merge_overlapping_detections(results, iou_threshold=0.5): """Merge overlapping detections from different sources""" if not results: return [] def calculate_iou(box1, box2): x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) if x2 < x1 or y2 < y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = area1 + area2 - intersection return intersection / union if union > 0 else 0 merged = [] used = set() for i, r1 in enumerate(results): if i in used: continue current_group = [r1] used.add(i) for j, r2 in enumerate(results): if j in used: continue if calculate_iou(r1['bbox'], r2['bbox']) > iou_threshold: current_group.append(r2) used.add(j) if len(current_group) == 1: merged.append(current_group[0]) else: # Keep the detection with highest confidence best_detection = max(current_group, key=lambda x: x['confidence']) merged.append(best_detection) return merged def classify_text(text): """Classify text based on patterns""" if not text: return 'Unknown' # Clean and normalize text text = text.strip().upper() text = re.sub(r'\s+', '', text) for text_type, pattern in TEXT_PATTERNS.items(): if re.match(pattern, text): return text_type return 'Unknown' def annotate_image(image, results): """Create annotated image with detections""" # Convert image to RGB mode to ensure color support if image.mode != 'RGB': image = image.convert('RGB') # Create drawing object draw = ImageDraw.Draw(image) try: font = ImageFont.truetype("arial.ttf", 20) except IOError: font = ImageFont.load_default() # Define colors for different text types colors = { 'Line_Number': "#FF0000", # Bright Red 'Equipment_Tag': "#00FF00", # Bright Green 'Instrument_Tag': "#0000FF", # Bright Blue 'Valve_Number': "#FFA500", # Bright Orange 'Pipe_Size': "#FF00FF", # Bright Magenta 'Process_Instrument': "#00FFFF", # Bright Cyan 'Nozzle': "#FFFF00", # Yellow 'Pipe_Connector': "#800080", # Purple 'Unknown': "#FF4444" # Light Red } # Draw detections for result in results: text_type = result.get('text_type', 'Unknown') color = colors.get(text_type, colors['Unknown']) # Draw bounding box draw.rectangle(result['bbox'], outline=color, width=3) # Create label label = f"{result['text']} ({result['confidence']:.2f})" if text_type != 'Unknown': label += f" [{text_type}]" # Draw label background text_bbox = draw.textbbox((result['bbox'][0], result['bbox'][1] - 20), label, font=font) draw.rectangle(text_bbox, fill="#FFFFFF") # Draw label text draw.text((result['bbox'][0], result['bbox'][1] - 20), label, fill=color, font=font) return image def save_annotated_image(image, path, storage): """Save annotated image with maximum quality""" image_byte_array = io.BytesIO() image.save( image_byte_array, format='PNG', optimize=False, compress_level=0 ) storage.save_file(path, image_byte_array.getvalue()) if __name__ == "__main__": from storage import StorageFactory import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize storage storage = StorageFactory.get_storage() # Test file paths file_path = "processed_pages/10219-1-DG-BC-00011.01-REV_A_page_1_text.png" result_path = "results" try: # Ensure result directory exists os.makedirs(result_path, exist_ok=True) # Process the drawing logger.info(f"Processing file: {file_path}") results, summary = process_drawing(file_path, result_path, storage) # Print detailed results print("\n=== DETAILED DETECTION RESULTS ===") print(f"\nTotal Detections: {summary['total_detections']}") print("\nBreakdown by Text Type:") print("-" * 50) for text_type, stats in summary['by_type'].items(): if stats['count'] > 0: print(f"\n{text_type}:") print(f" Count: {stats['count']}") print(f" Average Confidence: {stats['avg_confidence']:.2f}") print(" Items:") for item in stats['items']: print(f" - {item['text']} (conf: {item['confidence']:.2f}, source: {item['source']})") print("\nBreakdown by OCR Engine:") print("-" * 50) for source, count in summary['by_source'].items(): print(f"{source}: {count} detections") print("\nConfidence Distribution:") print("-" * 50) for range_name, count in summary['confidence_ranges'].items(): print(f"{range_name}: {count} detections") # Print output paths print("\nOutput Files:") print("-" * 50) print(f"Annotated Image: {results['image_path']}") print(f"JSON Results: {results['json_path']}") except Exception as e: logger.error(f"Error processing file: {e}") raise