Spaces:
Runtime error
Runtime error
| """ | |
| ocr_pipeline.py - LOGOS OCR Pipeline | |
| Extract text from architectural diagrams and UI screenshots using EasyOCR. | |
| """ | |
| import os | |
| import json | |
| from typing import List, Dict, Optional | |
| from dataclasses import dataclass, asdict | |
| try: | |
| import easyocr | |
| EASYOCR_AVAILABLE = True | |
| except ImportError: | |
| EASYOCR_AVAILABLE = False | |
| print("[OCR] EasyOCR not available. Install with: pip install easyocr") | |
| class TextBlock: | |
| """A single detected text region.""" | |
| text: str | |
| confidence: float | |
| bbox: Optional[List[List[int]]] = None # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] | |
| class OCRResult: | |
| """OCR result for a single image.""" | |
| filename: str | |
| path: str | |
| text_blocks: List[TextBlock] | |
| full_text: str | |
| word_count: int | |
| class LOGOSOCRPipeline: | |
| """ | |
| OCR pipeline for extracting text from LOGOS protocol screenshots. | |
| Uses EasyOCR for reliable text detection without GPU requirement. | |
| """ | |
| def __init__(self, languages: List[str] = None, gpu: bool = False): | |
| """ | |
| Initialize the OCR pipeline. | |
| Args: | |
| languages: List of language codes (default: ['en']) | |
| gpu: Whether to use GPU acceleration | |
| """ | |
| if not EASYOCR_AVAILABLE: | |
| raise ImportError("EasyOCR is required. Install with: pip install easyocr") | |
| self.languages = languages or ['en'] | |
| self.reader = easyocr.Reader(self.languages, gpu=gpu) | |
| print(f"[OCR] Initialized EasyOCR with languages: {self.languages}") | |
| def extract_text(self, image_path: str, detail: bool = True) -> OCRResult: | |
| """ | |
| Extract text from a single image. | |
| Args: | |
| image_path: Path to the image file | |
| detail: If True, include bounding boxes | |
| Returns: | |
| OCRResult with extracted text blocks | |
| """ | |
| if not os.path.exists(image_path): | |
| raise FileNotFoundError(f"Image not found: {image_path}") | |
| # Run OCR | |
| results = self.reader.readtext(image_path) | |
| # Parse results | |
| text_blocks = [] | |
| for bbox, text, confidence in results: | |
| block = TextBlock( | |
| text=text, | |
| confidence=round(confidence, 4), | |
| bbox=bbox if detail else None | |
| ) | |
| text_blocks.append(block) | |
| # Build full text (sorted by Y position for reading order) | |
| sorted_blocks = sorted(text_blocks, key=lambda b: b.bbox[0][1] if b.bbox else 0) | |
| full_text = " ".join([b.text for b in sorted_blocks]) | |
| return OCRResult( | |
| filename=os.path.basename(image_path), | |
| path=image_path, | |
| text_blocks=text_blocks, | |
| full_text=full_text, | |
| word_count=len(full_text.split()) | |
| ) | |
| def batch_process(self, folder: str, extensions: List[str] = None) -> List[OCRResult]: | |
| """ | |
| Process all images in a folder. | |
| Args: | |
| folder: Path to folder containing images | |
| extensions: File extensions to include (default: ['.png', '.jpg', '.jpeg']) | |
| Returns: | |
| List of OCRResult objects | |
| """ | |
| if extensions is None: | |
| extensions = ['.png', '.jpg', '.jpeg', '.bmp', '.webp'] | |
| results = [] | |
| files = sorted([f for f in os.listdir(folder) | |
| if os.path.splitext(f)[1].lower() in extensions]) | |
| print(f"[OCR] Processing {len(files)} images from {folder}") | |
| for i, filename in enumerate(files): | |
| path = os.path.join(folder, filename) | |
| try: | |
| result = self.extract_text(path) | |
| results.append(result) | |
| print(f"[OCR] [{i+1}/{len(files)}] {filename}: {result.word_count} words") | |
| except Exception as e: | |
| print(f"[OCR] Error processing {filename}: {e}") | |
| return results | |
| def export_to_json(self, results: List[OCRResult], output_path: str): | |
| """Export OCR results to JSON file.""" | |
| data = [asdict(r) for r in results] | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| print(f"[OCR] Exported {len(results)} results to {output_path}") | |
| def search(self, results: List[OCRResult], query: str) -> List[OCRResult]: | |
| """Search OCR results for a query string.""" | |
| query_lower = query.lower() | |
| return [r for r in results if query_lower in r.full_text.lower()] | |
| def build_knowledge_base(folder: str, output_path: str = "logos_knowledge_base.json"): | |
| """ | |
| Build a knowledge base from all screenshots in a folder. | |
| Args: | |
| folder: Path to LOGOS Screenshots folder | |
| output_path: Path for output JSON file | |
| """ | |
| pipeline = LOGOSOCRPipeline(gpu=False) | |
| results = pipeline.batch_process(folder) | |
| pipeline.export_to_json(results, output_path) | |
| # Summary | |
| total_words = sum(r.word_count for r in results) | |
| print(f"\n[OCR] Knowledge Base Summary:") | |
| print(f" - Images processed: {len(results)}") | |
| print(f" - Total words extracted: {total_words}") | |
| print(f" - Output file: {output_path}") | |
| return results | |
| # CLI for standalone usage | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("Usage: python ocr_pipeline.py <folder_path> [output.json]") | |
| sys.exit(1) | |
| folder = sys.argv[1] | |
| output = sys.argv[2] if len(sys.argv) > 2 else "logos_knowledge_base.json" | |
| build_knowledge_base(folder, output) | |