Spaces:
Runtime error
Runtime error
| import asyncio | |
| from PIL import Image | |
| import pytesseract | |
| import re, cv2 | |
| import imutils | |
| from concurrent.futures import ThreadPoolExecutor | |
| from app.models.ocrtemplate import * | |
| from app.core.database import get_database | |
| from app.core.config import settings | |
| from typing import Any | |
| from fastapi import HTTPException | |
| from pytesseract import Output | |
| pytesseract.pytesseract.tesseract_cmd = r'C:\\Program Files\\Tesseract-OCR\\tesseract.exe' | |
| def identify_structure(line): | |
| line = line.strip() | |
| if line.count(':') > 1: | |
| return 'mixed-column' | |
| if ':' in line or re.search(r'^[A-Za-z]+\s+[A-Za-z0-9]+', line): | |
| return 'key-value' | |
| uppercase_words = re.findall(r'\b[A-Z]+\b', line) | |
| numbers = re.findall(r'\b\d+\b', line) | |
| if len(uppercase_words) > 1: | |
| return 'table-header' | |
| if len(numbers) > 1 and len(uppercase_words) <= 1: | |
| return 'table-row' | |
| return 'text' | |
| def format_extracted_text(text): | |
| lines = text.split('\n') | |
| lines = [line.strip() for line in lines if line.strip()] | |
| formatted_text = [] | |
| in_table = False | |
| for line in lines: | |
| structure = identify_structure(line) | |
| if structure == 'mixed-column': | |
| parts = line.split(':') | |
| formatted_parts = [f"{parts[i].strip()}: {parts[i+1].strip()}" for i in range(0, len(parts)-1, 2)] | |
| formatted_text.extend(formatted_parts) | |
| in_table = False | |
| elif structure == 'key-value': | |
| formatted_text.append(line) | |
| in_table = False | |
| elif structure == 'table-header': | |
| formatted_text.append(line) | |
| in_table = True | |
| elif structure == 'table-row' and in_table: | |
| formatted_text.append(line) | |
| else: | |
| if in_table: | |
| in_table = False | |
| formatted_text.append("\n") | |
| formatted_text.append(line) | |
| return "\n".join(formatted_text) | |
| def refine_text_formatting(text): | |
| text = re.sub(r'\s+', ' ', text) | |
| text = re.sub(r'\.\s', '.\n', text) | |
| text = re.sub(r'\s*:\s*', ': ', text) | |
| return text | |
| def do_ocr(image_path): | |
| image = Image.open(image_path) | |
| extracted_text = pytesseract.image_to_string(image) | |
| formatted_text = format_extracted_text(extracted_text) | |
| return formatted_text | |
| async def do_ocr_tesseract(image_path): | |
| loop = asyncio.get_event_loop() | |
| with ThreadPoolExecutor() as pool: | |
| formatted_text = await loop.run_in_executor(pool, do_ocr, image_path) | |
| return formatted_text | |
| async def create_data_from_template(template_name:str, fields:Dict[str, str], user_id:str) -> OCRTemplateInDB: | |
| template = OCRTemplateInDB(template_name=template_name, fields=fields, user_id=user_id) | |
| db = get_database(settings.MongoDB_NAME) | |
| result = await db["extracted data"].insert_one(template.dict()) | |
| if template: | |
| return template | |
| return None | |
| def preprocess_image(image: Any) -> Any: | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| # Apply Gaussian blur | |
| blur = cv2.GaussianBlur(gray, (3, 3), 0) | |
| # Apply Otsu's thresholding | |
| thresh = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)[1] | |
| # Morph open to remove noise and invert image | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) | |
| opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel, iterations=1) | |
| invert = 255 - opening | |
| return invert | |
| async def detect_rotation(image_path: str) -> Any: | |
| # Load the input image | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise HTTPException(status_code=400, detail="Image not found or unable to read") | |
| # Convert from BGR to RGB channel ordering | |
| rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| # Use Tesseract to determine the text orientation | |
| results = pytesseract.image_to_osd(rgb, output_type=Output.DICT) | |
| # Display the orientation information | |
| print("[INFO] detected orientation: {}".format(results["orientation"])) | |
| print("[INFO] rotate by {} degrees to correct".format(results["rotate"])) | |
| print("[INFO] detected script: {}".format(results["script"])) | |
| # Rotate the image to correct the orientation | |
| rotated = imutils.rotate_bound(image, angle=results["rotate"]) | |
| return rotated | |
| async def tesseract_ocr(image_path: str) -> str: | |
| # Detect rotation and get the image | |
| image = await detect_rotation(image_path) | |
| # Preprocess the image | |
| preprocessed_image = preprocess_image(image) | |
| # Perform OCR using Tesseract | |
| result = pytesseract.image_to_string(preprocessed_image, config='--psm 6') | |
| formatted_text = format_extracted_text(result) | |
| return formatted_text | |
| # Example usage | |
| async def main(): | |
| image_path = 'KTP.jpg' | |
| formatted_text = await do_ocr_tesseract(image_path) | |
| formatted_text_pre = await tesseract_ocr(image_path) | |
| print(formatted_text) | |
| print(formatted_text_pre) | |
| # asyncio.run(main()) | |