Spaces:
Build error
Build error
| import os | |
| import json | |
| import io | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| from doctr.models import ocr_predictor | |
| import pytesseract | |
| import easyocr | |
| from storage import StorageInterface | |
| import re | |
| import logging | |
| from pathlib import Path | |
| import cv2 | |
| import traceback | |
| # Initialize models | |
| try: | |
| doctr_model = ocr_predictor(pretrained=True) | |
| easyocr_reader = easyocr.Reader(['en']) | |
| logging.info("All OCR models loaded successfully") | |
| except Exception as e: | |
| logging.error(f"Error loading OCR models: {e}") | |
| # Combined patterns from all approaches | |
| TEXT_PATTERNS = { | |
| 'Line_Number': r"(?:\d{1,5}[-](?:[A-Z]{2,4})[-]\d{1,3})", | |
| 'Equipment_Tag': r"(?:[A-Z]{1,3}[-][A-Z0-9]{1,4}[-]\d{1,3})", | |
| 'Instrument_Tag': r"(?:\d{2,3}[-][A-Z]{2,4}[-]\d{2,3})", | |
| 'Valve_Number': r"(?:[A-Z]{1,2}[-]\d{3})", | |
| 'Pipe_Size': r"(?:\d{1,2}[\"])", | |
| 'Flow_Direction': r"(?:FROM|TO)", | |
| 'Service_Description': r"(?:STEAM|WATER|AIR|GAS|DRAIN)", | |
| 'Process_Instrument': r"(?:[0-9]{2,3}(?:-[A-Z]{2,3})?-[0-9]{2,3}|[A-Z]{2,3}-[0-9]{2,3})", | |
| 'Nozzle': r"(?:N[0-9]{1,2}|MH)", | |
| 'Pipe_Connector': r"(?:[0-9]{1,5}|[A-Z]{1,2}[0-9]{2,5})" | |
| } | |
| def detect_text_combined(image, confidence_threshold=0.3): | |
| """Combine results from all three OCR approaches""" | |
| results = [] | |
| # 1. Tesseract Detection | |
| tesseract_results = detect_with_tesseract(image) | |
| for result in tesseract_results: | |
| result['source'] = 'tesseract' | |
| results.append(result) | |
| # 2. EasyOCR Detection | |
| easyocr_results = detect_with_easyocr(image) | |
| for result in easyocr_results: | |
| result['source'] = 'easyocr' | |
| results.append(result) | |
| # 3. DocTR Detection | |
| doctr_results = detect_with_doctr(image) | |
| for result in doctr_results: | |
| result['source'] = 'doctr' | |
| results.append(result) | |
| # Merge overlapping detections | |
| merged_results = merge_overlapping_detections(results) | |
| # Classify and filter results | |
| classified_results = [] | |
| for result in merged_results: | |
| if result['confidence'] >= confidence_threshold: | |
| text_type = classify_text(result['text']) | |
| result['text_type'] = text_type | |
| classified_results.append(result) | |
| return classified_results | |
| def generate_detailed_summary(results): | |
| """Generate detailed detection summary""" | |
| summary = { | |
| 'total_detections': len(results), | |
| 'by_type': {}, | |
| 'by_source': { | |
| 'tesseract': { | |
| 'count': 0, | |
| 'by_type': {}, | |
| 'avg_confidence': 0.0 | |
| }, | |
| 'easyocr': { | |
| 'count': 0, | |
| 'by_type': {}, | |
| 'avg_confidence': 0.0 | |
| }, | |
| 'doctr': { | |
| 'count': 0, | |
| 'by_type': {}, | |
| 'avg_confidence': 0.0 | |
| } | |
| }, | |
| 'confidence_ranges': { | |
| '0.9-1.0': 0, | |
| '0.8-0.9': 0, | |
| '0.7-0.8': 0, | |
| '0.6-0.7': 0, | |
| '0.5-0.6': 0, | |
| '<0.5': 0 | |
| }, | |
| 'detected_items': [] | |
| } | |
| # Initialize type counters | |
| for pattern_type in TEXT_PATTERNS.keys(): | |
| summary['by_type'][pattern_type] = { | |
| 'count': 0, | |
| 'avg_confidence': 0.0, | |
| 'by_source': { | |
| 'tesseract': 0, | |
| 'easyocr': 0, | |
| 'doctr': 0 | |
| }, | |
| 'items': [] | |
| } | |
| # Initialize source-specific type counters | |
| for source in summary['by_source'].keys(): | |
| summary['by_source'][source]['by_type'][pattern_type] = 0 | |
| # Process each detection | |
| source_confidences = {'tesseract': [], 'easyocr': [], 'doctr': []} | |
| for result in results: | |
| # Get source and confidence | |
| source = result['source'] | |
| conf = result['confidence'] | |
| text_type = result['text_type'] | |
| # Update source statistics | |
| summary['by_source'][source]['count'] += 1 | |
| source_confidences[source].append(conf) | |
| # Update confidence ranges | |
| if conf >= 0.9: summary['confidence_ranges']['0.9-1.0'] += 1 | |
| elif conf >= 0.8: summary['confidence_ranges']['0.8-0.9'] += 1 | |
| elif conf >= 0.7: summary['confidence_ranges']['0.7-0.8'] += 1 | |
| elif conf >= 0.6: summary['confidence_ranges']['0.6-0.7'] += 1 | |
| elif conf >= 0.5: summary['confidence_ranges']['0.5-0.6'] += 1 | |
| else: summary['confidence_ranges']['<0.5'] += 1 | |
| # Update type statistics | |
| if text_type in summary['by_type']: | |
| type_stats = summary['by_type'][text_type] | |
| type_stats['count'] += 1 | |
| type_stats['by_source'][source] += 1 | |
| summary['by_source'][source]['by_type'][text_type] += 1 | |
| type_stats['items'].append({ | |
| 'text': result['text'], | |
| 'confidence': conf, | |
| 'source': source, | |
| 'bbox': result['bbox'] | |
| }) | |
| # Add to detected items | |
| summary['detected_items'].append({ | |
| 'text': result['text'], | |
| 'type': text_type, | |
| 'confidence': conf, | |
| 'source': source, | |
| 'bbox': result['bbox'] | |
| }) | |
| # Calculate average confidences | |
| for source, confs in source_confidences.items(): | |
| if confs: | |
| summary['by_source'][source]['avg_confidence'] = sum(confs) / len(confs) | |
| # Calculate average confidences for each type | |
| for text_type, stats in summary['by_type'].items(): | |
| if stats['items']: | |
| stats['avg_confidence'] = sum(item['confidence'] for item in stats['items']) / len(stats['items']) | |
| return summary | |
| def process_drawing(image_path, results_dir, storage=None): | |
| try: | |
| # Read image using cv2 | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError(f"Could not read image from {image_path}") | |
| # Create annotated copy | |
| annotated_image = image.copy() | |
| # Initialize results and summary | |
| text_results = { | |
| 'file_name': image_path, | |
| 'detections': [] | |
| } | |
| text_summary = { | |
| 'total_detections': 0, | |
| 'by_source': { | |
| 'tesseract': {'count': 0, 'avg_confidence': 0.0}, | |
| 'easyocr': {'count': 0, 'avg_confidence': 0.0}, | |
| 'doctr': {'count': 0, 'avg_confidence': 0.0} | |
| }, | |
| 'by_type': { | |
| 'equipment_tag': {'count': 0, 'avg_confidence': 0.0}, | |
| 'line_number': {'count': 0, 'avg_confidence': 0.0}, | |
| 'instrument_tag': {'count': 0, 'avg_confidence': 0.0}, | |
| 'valve_number': {'count': 0, 'avg_confidence': 0.0}, | |
| 'pipe_size': {'count': 0, 'avg_confidence': 0.0}, | |
| 'flow_direction': {'count': 0, 'avg_confidence': 0.0}, | |
| 'service_description': {'count': 0, 'avg_confidence': 0.0}, | |
| 'process_instrument': {'count': 0, 'avg_confidence': 0.0}, | |
| 'nozzle': {'count': 0, 'avg_confidence': 0.0}, | |
| 'pipe_connector': {'count': 0, 'avg_confidence': 0.0}, | |
| 'other': {'count': 0, 'avg_confidence': 0.0} | |
| } | |
| } | |
| # Run OCR with different engines | |
| tesseract_results = detect_with_tesseract(image) | |
| easyocr_results = detect_with_easyocr(image) | |
| doctr_results = detect_with_doctr(image) | |
| # Combine results | |
| all_detections = [] | |
| all_detections.extend([(res, 'tesseract') for res in tesseract_results]) | |
| all_detections.extend([(res, 'easyocr') for res in easyocr_results]) | |
| all_detections.extend([(res, 'doctr') for res in doctr_results]) | |
| # Process each detection | |
| for detection, source in all_detections: | |
| # Update text_results | |
| text_results['detections'].append({ | |
| 'text': detection['text'], | |
| 'bbox': detection['bbox'], | |
| 'confidence': detection['confidence'], | |
| 'source': source | |
| }) | |
| # Update summary statistics | |
| text_summary['total_detections'] += 1 | |
| text_summary['by_source'][source]['count'] += 1 | |
| text_summary['by_source'][source]['avg_confidence'] += detection['confidence'] | |
| # Draw detection on image | |
| x1, y1, x2, y2 = detection['bbox'] | |
| cv2.rectangle(annotated_image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) | |
| cv2.putText(annotated_image, detection['text'], (int(x1), int(y1)-5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) | |
| # Calculate average confidences | |
| for source in text_summary['by_source']: | |
| if text_summary['by_source'][source]['count'] > 0: | |
| text_summary['by_source'][source]['avg_confidence'] /= text_summary['by_source'][source]['count'] | |
| # Save results with new naming convention | |
| base_name = Path(image_path).stem | |
| text_result_image_path = os.path.join(results_dir, f"{base_name}_detected_texts.jpg") | |
| text_result_json_path = os.path.join(results_dir, f"{base_name}_detected_texts.json") | |
| # Save the annotated image | |
| success = cv2.imwrite(text_result_image_path, annotated_image) | |
| if not success: | |
| raise ValueError(f"Failed to save image to {text_result_image_path}") | |
| # Save the JSON results | |
| with open(text_result_json_path, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| 'file_name': image_path, | |
| 'summary': text_summary, | |
| 'detections': text_results['detections'] | |
| }, f, indent=4, ensure_ascii=False) | |
| return { | |
| 'image_path': text_result_image_path, | |
| 'json_path': text_result_json_path, | |
| 'results': text_results | |
| }, text_summary | |
| except Exception as e: | |
| print(f"Error in process_drawing: {str(e)}") | |
| traceback.print_exc() | |
| return None, None | |
| def detect_with_tesseract(image): | |
| """Detect text using Tesseract OCR""" | |
| # Configure Tesseract for technical drawings | |
| custom_config = r'--oem 3 --psm 11 -c tessedit_char_whitelist="ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-.()" -c tessedit_write_images=true -c textord_heavy_nr=true -c textord_min_linesize=3' | |
| try: | |
| data = pytesseract.image_to_data( | |
| image, | |
| config=custom_config, | |
| output_type=pytesseract.Output.DICT | |
| ) | |
| results = [] | |
| for i in range(len(data['text'])): | |
| conf = float(data['conf'][i]) | |
| if conf > 30: # Lower confidence threshold for technical text | |
| text = data['text'][i].strip() | |
| if text: | |
| x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i] | |
| results.append({ | |
| 'text': text, | |
| 'bbox': [x, y, x + w, y + h], | |
| 'confidence': conf / 100.0 | |
| }) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Tesseract error: {str(e)}") | |
| return [] | |
| def detect_with_easyocr(image): | |
| """Detect text using EasyOCR""" | |
| if easyocr_reader is None: | |
| return [] | |
| try: | |
| results = easyocr_reader.readtext( | |
| np.array(image), | |
| paragraph=False, | |
| height_ths=2.0, | |
| width_ths=2.0, | |
| contrast_ths=0.2, | |
| text_threshold=0.5 | |
| ) | |
| parsed_results = [] | |
| for bbox, text, conf in results: | |
| x1, y1 = min(point[0] for point in bbox), min(point[1] for point in bbox) | |
| x2, y2 = max(point[0] for point in bbox), max(point[1] for point in bbox) | |
| parsed_results.append({ | |
| 'text': text, | |
| 'bbox': [int(x1), int(y1), int(x2), int(y2)], | |
| 'confidence': conf | |
| }) | |
| return parsed_results | |
| except Exception as e: | |
| logger.error(f"EasyOCR error: {str(e)}") | |
| return [] | |
| def detect_with_doctr(image): | |
| """Detect text using DocTR""" | |
| try: | |
| # Convert PIL image to numpy array | |
| image_np = np.array(image) | |
| # Get predictions | |
| result = doctr_model([image_np]) | |
| doc = result.export() | |
| # Parse results | |
| results = [] | |
| for page in doc['pages']: | |
| for block in page['blocks']: | |
| for line in block['lines']: | |
| for word in line['words']: | |
| # Convert normalized coordinates to absolute | |
| height, width = image_np.shape[:2] | |
| points = np.array(word['geometry']) * np.array([width, height]) | |
| x1, y1 = points.min(axis=0) | |
| x2, y2 = points.max(axis=0) | |
| results.append({ | |
| 'text': word['value'], | |
| 'bbox': [int(x1), int(y1), int(x2), int(y2)], | |
| 'confidence': word.get('confidence', 0.5) | |
| }) | |
| return results | |
| except Exception as e: | |
| logger.error(f"DocTR error: {str(e)}") | |
| return [] | |
| def merge_overlapping_detections(results, iou_threshold=0.5): | |
| """Merge overlapping detections from different sources""" | |
| if not results: | |
| return [] | |
| def calculate_iou(box1, box2): | |
| x1 = max(box1[0], box2[0]) | |
| y1 = max(box1[1], box2[1]) | |
| x2 = min(box1[2], box2[2]) | |
| y2 = min(box1[3], box2[3]) | |
| if x2 < x1 or y2 < y1: | |
| return 0.0 | |
| intersection = (x2 - x1) * (y2 - y1) | |
| area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
| area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) | |
| union = area1 + area2 - intersection | |
| return intersection / union if union > 0 else 0 | |
| merged = [] | |
| used = set() | |
| for i, r1 in enumerate(results): | |
| if i in used: | |
| continue | |
| current_group = [r1] | |
| used.add(i) | |
| for j, r2 in enumerate(results): | |
| if j in used: | |
| continue | |
| if calculate_iou(r1['bbox'], r2['bbox']) > iou_threshold: | |
| current_group.append(r2) | |
| used.add(j) | |
| if len(current_group) == 1: | |
| merged.append(current_group[0]) | |
| else: | |
| # Keep the detection with highest confidence | |
| best_detection = max(current_group, key=lambda x: x['confidence']) | |
| merged.append(best_detection) | |
| return merged | |
| def classify_text(text): | |
| """Classify text based on patterns""" | |
| if not text: | |
| return 'Unknown' | |
| # Clean and normalize text | |
| text = text.strip().upper() | |
| text = re.sub(r'\s+', '', text) | |
| for text_type, pattern in TEXT_PATTERNS.items(): | |
| if re.match(pattern, text): | |
| return text_type | |
| return 'Unknown' | |
| def annotate_image(image, results): | |
| """Create annotated image with detections""" | |
| # Convert image to RGB mode to ensure color support | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Create drawing object | |
| draw = ImageDraw.Draw(image) | |
| try: | |
| font = ImageFont.truetype("arial.ttf", 20) | |
| except IOError: | |
| font = ImageFont.load_default() | |
| # Define colors for different text types | |
| colors = { | |
| 'Line_Number': "#FF0000", # Bright Red | |
| 'Equipment_Tag': "#00FF00", # Bright Green | |
| 'Instrument_Tag': "#0000FF", # Bright Blue | |
| 'Valve_Number': "#FFA500", # Bright Orange | |
| 'Pipe_Size': "#FF00FF", # Bright Magenta | |
| 'Process_Instrument': "#00FFFF", # Bright Cyan | |
| 'Nozzle': "#FFFF00", # Yellow | |
| 'Pipe_Connector': "#800080", # Purple | |
| 'Unknown': "#FF4444" # Light Red | |
| } | |
| # Draw detections | |
| for result in results: | |
| text_type = result.get('text_type', 'Unknown') | |
| color = colors.get(text_type, colors['Unknown']) | |
| # Draw bounding box | |
| draw.rectangle(result['bbox'], outline=color, width=3) | |
| # Create label | |
| label = f"{result['text']} ({result['confidence']:.2f})" | |
| if text_type != 'Unknown': | |
| label += f" [{text_type}]" | |
| # Draw label background | |
| text_bbox = draw.textbbox((result['bbox'][0], result['bbox'][1] - 20), label, font=font) | |
| draw.rectangle(text_bbox, fill="#FFFFFF") | |
| # Draw label text | |
| draw.text((result['bbox'][0], result['bbox'][1] - 20), label, fill=color, font=font) | |
| return image | |
| def save_annotated_image(image, path, storage): | |
| """Save annotated image with maximum quality""" | |
| image_byte_array = io.BytesIO() | |
| image.save( | |
| image_byte_array, | |
| format='PNG', | |
| optimize=False, | |
| compress_level=0 | |
| ) | |
| storage.save_file(path, image_byte_array.getvalue()) | |
| if __name__ == "__main__": | |
| from storage import StorageFactory | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize storage | |
| storage = StorageFactory.get_storage() | |
| # Test file paths | |
| file_path = "processed_pages/10219-1-DG-BC-00011.01-REV_A_page_1_text.png" | |
| result_path = "results" | |
| try: | |
| # Ensure result directory exists | |
| os.makedirs(result_path, exist_ok=True) | |
| # Process the drawing | |
| logger.info(f"Processing file: {file_path}") | |
| results, summary = process_drawing(file_path, result_path, storage) | |
| # Print detailed results | |
| print("\n=== DETAILED DETECTION RESULTS ===") | |
| print(f"\nTotal Detections: {summary['total_detections']}") | |
| print("\nBreakdown by Text Type:") | |
| print("-" * 50) | |
| for text_type, stats in summary['by_type'].items(): | |
| if stats['count'] > 0: | |
| print(f"\n{text_type}:") | |
| print(f" Count: {stats['count']}") | |
| print(f" Average Confidence: {stats['avg_confidence']:.2f}") | |
| print(" Items:") | |
| for item in stats['items']: | |
| print(f" - {item['text']} (conf: {item['confidence']:.2f}, source: {item['source']})") | |
| print("\nBreakdown by OCR Engine:") | |
| print("-" * 50) | |
| for source, count in summary['by_source'].items(): | |
| print(f"{source}: {count} detections") | |
| print("\nConfidence Distribution:") | |
| print("-" * 50) | |
| for range_name, count in summary['confidence_ranges'].items(): | |
| print(f"{range_name}: {count} detections") | |
| # Print output paths | |
| print("\nOutput Files:") | |
| print("-" * 50) | |
| print(f"Annotated Image: {results['image_path']}") | |
| print(f"JSON Results: {results['json_path']}") | |
| except Exception as e: | |
| logger.error(f"Error processing file: {e}") | |
| raise |