|
|
from typing import Dict, List, Optional, Tuple |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from paddleocr import PaddleOCR |
|
|
import easyocr |
|
|
from PIL import Image |
|
|
import io |
|
|
import google.generativeai as genai |
|
|
from src.document_config import DocumentType, DocumentRequirement, HealthcareProcess, get_process_requirements |
|
|
import json |
|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
class DocumentOCRService: |
|
|
def __init__(self): |
|
|
|
|
|
self.paddle_ocr = PaddleOCR(use_angle_cls=True, lang='id') |
|
|
|
|
|
self.easy_ocr = easyocr.Reader(['id']) |
|
|
|
|
|
genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) |
|
|
self.gemini_model = genai.GenerativeModel('gemini-pro-vision') |
|
|
|
|
|
def preprocess_image(self, image: np.ndarray) -> np.ndarray: |
|
|
"""Preprocess the image for better OCR results.""" |
|
|
|
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
thresh = cv2.adaptiveThreshold( |
|
|
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 |
|
|
) |
|
|
return thresh |
|
|
|
|
|
def process_document(self, image: np.ndarray, document_type: str, process_type: Optional[str] = None) -> Dict: |
|
|
""" |
|
|
Process different types of Indonesian documents. |
|
|
|
|
|
Args: |
|
|
image: Input image as numpy array |
|
|
document_type: Type of document (KTP, KK, BPJS, etc.) |
|
|
process_type: Type of healthcare process (optional) |
|
|
|
|
|
Returns: |
|
|
Dictionary containing extracted information and validation results |
|
|
""" |
|
|
|
|
|
processed_image = self.preprocess_image(image) |
|
|
|
|
|
|
|
|
if document_type in ['KTP', 'KK', 'BPJS']: |
|
|
ocr_result = self.paddle_ocr.ocr(processed_image, cls=True) |
|
|
extracted_data = self._parse_structured_document(ocr_result, document_type) |
|
|
else: |
|
|
ocr_result = self.easy_ocr.readtext(processed_image) |
|
|
extracted_data = self._parse_unstructured_document(ocr_result, document_type) |
|
|
|
|
|
|
|
|
gemini_analysis = self._analyze_with_gemini(image, document_type, extracted_data) |
|
|
|
|
|
|
|
|
validation_result = None |
|
|
if process_type: |
|
|
validation_result = self._validate_against_process( |
|
|
extracted_data, |
|
|
document_type, |
|
|
HealthcareProcess(process_type) |
|
|
) |
|
|
|
|
|
return { |
|
|
"extracted_data": extracted_data, |
|
|
"gemini_analysis": gemini_analysis, |
|
|
"validation_result": validation_result |
|
|
} |
|
|
|
|
|
def _analyze_with_gemini(self, image: np.ndarray, document_type: str, ocr_data: Dict) -> Dict: |
|
|
"""Analyze document using Gemini VLM.""" |
|
|
|
|
|
_, buffer = cv2.imencode('.jpg', image) |
|
|
image_bytes = buffer.tobytes() |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
Analyze this {document_type} document and provide: |
|
|
1. Document authenticity assessment |
|
|
2. Any potential issues or inconsistencies |
|
|
3. Additional information not captured by OCR |
|
|
4. Recommendations for document improvement if needed |
|
|
|
|
|
OCR extracted data: {json.dumps(ocr_data, indent=2)} |
|
|
""" |
|
|
|
|
|
|
|
|
response = self.gemini_model.generate_content([prompt, image_bytes]) |
|
|
|
|
|
return { |
|
|
"analysis": response.text, |
|
|
"confidence_score": response.candidates[0].score if hasattr(response, 'candidates') else None |
|
|
} |
|
|
|
|
|
def _validate_against_process( |
|
|
self, |
|
|
extracted_data: Dict, |
|
|
document_type: str, |
|
|
process: HealthcareProcess |
|
|
) -> Dict: |
|
|
"""Validate document against process requirements.""" |
|
|
requirements = get_process_requirements(process) |
|
|
document_requirement = next( |
|
|
(req for req in requirements if req.document_type.value == document_type), |
|
|
None |
|
|
) |
|
|
|
|
|
if not document_requirement: |
|
|
return { |
|
|
"is_valid": False, |
|
|
"message": f"Document type {document_type} is not required for this process" |
|
|
} |
|
|
|
|
|
|
|
|
missing_fields = [] |
|
|
for field, rule in document_requirement.validation_rules.items(): |
|
|
if rule == "required" and field not in extracted_data: |
|
|
missing_fields.append(field) |
|
|
|
|
|
return { |
|
|
"is_valid": len(missing_fields) == 0, |
|
|
"missing_fields": missing_fields, |
|
|
"document_requirement": document_requirement.description |
|
|
} |
|
|
|
|
|
def get_process_requirements(self, process: HealthcareProcess) -> Dict: |
|
|
"""Get document requirements for a specific process.""" |
|
|
requirements = get_process_requirements(process) |
|
|
return { |
|
|
"required_documents": [ |
|
|
{ |
|
|
"type": req.document_type.value, |
|
|
"description": req.description, |
|
|
"validation_rules": req.validation_rules |
|
|
} |
|
|
for req in requirements if req.is_required |
|
|
], |
|
|
"optional_documents": [ |
|
|
{ |
|
|
"type": req.document_type.value, |
|
|
"description": req.description, |
|
|
"validation_rules": req.validation_rules |
|
|
} |
|
|
for req in requirements if not req.is_required |
|
|
] |
|
|
} |
|
|
|
|
|
def _parse_structured_document(self, ocr_result: List, document_type: str) -> Dict: |
|
|
"""Parse results from structured documents like KTP, KK, BPJS.""" |
|
|
extracted_data = {} |
|
|
|
|
|
if document_type == 'KTP': |
|
|
|
|
|
field_mappings = { |
|
|
'NIK': r'NIK', |
|
|
'Nama': r'Nama', |
|
|
'Tempat/Tgl Lahir': r'Tempat/Tgl Lahir', |
|
|
'Alamat': r'Alamat', |
|
|
'RT/RW': r'RT/RW', |
|
|
'Kel/Desa': r'Kel/Desa', |
|
|
'Kecamatan': r'Kecamatan', |
|
|
'Agama': r'Agama', |
|
|
'Status Perkawinan': r'Status Perkawinan', |
|
|
'Pekerjaan': r'Pekerjaan', |
|
|
'Kewarganegaraan': r'Kewarganegaraan' |
|
|
} |
|
|
|
|
|
for line in ocr_result: |
|
|
text = line[1][0] |
|
|
for field, pattern in field_mappings.items(): |
|
|
if pattern.lower() in text.lower(): |
|
|
|
|
|
value = text.split(pattern)[-1].strip() |
|
|
extracted_data[field] = value |
|
|
|
|
|
elif document_type == 'BPJS': |
|
|
|
|
|
field_mappings = { |
|
|
'Nomor Kartu': r'Nomor Kartu', |
|
|
'Nama Peserta': r'Nama Peserta', |
|
|
'No. KTP': r'No. KTP', |
|
|
'Faskes Tingkat 1': r'Faskes Tingkat 1' |
|
|
} |
|
|
|
|
|
for line in ocr_result: |
|
|
text = line[1][0] |
|
|
for field, pattern in field_mappings.items(): |
|
|
if pattern.lower() in text.lower(): |
|
|
value = text.split(pattern)[-1].strip() |
|
|
extracted_data[field] = value |
|
|
|
|
|
return extracted_data |
|
|
|
|
|
def _parse_unstructured_document(self, ocr_result: List, document_type: str) -> Dict: |
|
|
"""Parse results from unstructured documents like medical bills, prescriptions.""" |
|
|
extracted_data = {} |
|
|
|
|
|
full_text = " ".join([text for _, text, _ in ocr_result]) |
|
|
extracted_data["full_text"] = full_text |
|
|
return extracted_data |