kyc-backend / app /services /ktp_ocr.py
supraptin's picture
Initial deployment to Hugging Face Spaces
bd2c5ca
"""
KTP OCR Service for extracting and parsing Indonesian ID card data.
This service uses EasyOCR to extract text from KTP images and parses
the extracted text into structured fields with sanitization.
"""
import re
import logging
from typing import Dict, Any, Optional, List, Tuple
from dataclasses import dataclass, field
from datetime import datetime
import cv2
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class KTPField:
"""Represents a single KTP field with confidence score."""
value: str
confidence: float
raw_value: str = ""
@dataclass
class KTPData:
"""Structured KTP data extracted from OCR."""
provinsi: Optional[KTPField] = None
kabupaten_kota: Optional[KTPField] = None
nik: Optional[KTPField] = None
nama: Optional[KTPField] = None
tempat_lahir: Optional[KTPField] = None
tanggal_lahir: Optional[KTPField] = None
jenis_kelamin: Optional[KTPField] = None
golongan_darah: Optional[KTPField] = None
alamat: Optional[KTPField] = None
rt_rw: Optional[KTPField] = None
kelurahan_desa: Optional[KTPField] = None
kecamatan: Optional[KTPField] = None
agama: Optional[KTPField] = None
status_perkawinan: Optional[KTPField] = None
pekerjaan: Optional[KTPField] = None
kewarganegaraan: Optional[KTPField] = None
berlaku_hingga: Optional[KTPField] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API response."""
result = {}
for field_name in [
'provinsi', 'kabupaten_kota', 'nik', 'nama', 'tempat_lahir',
'tanggal_lahir', 'jenis_kelamin', 'golongan_darah', 'alamat',
'rt_rw', 'kelurahan_desa', 'kecamatan', 'agama', 'status_perkawinan',
'pekerjaan', 'kewarganegaraan', 'berlaku_hingga'
]:
field_value = getattr(self, field_name)
if field_value:
result[field_name] = {
'value': field_value.value,
'confidence': field_value.confidence,
'raw_value': field_value.raw_value
}
else:
result[field_name] = None
return result
class KTPOCRService:
"""
Service for performing OCR on Indonesian KTP (ID card) images.
Features:
- Text extraction using EasyOCR
- Field parsing and validation
- NIK validation
- Data sanitization
"""
def __init__(self):
self.reader = None
self.initialized = False
# KTP field labels for matching
self.field_labels = {
'nik': ['NIK', 'N I K', 'NlK'],
'nama': ['Nama', 'NAMA', 'Name'],
'tempat_tanggal_lahir': ['Tempat/Tgl Lahir', 'Tempat/TglLahir', 'Tempat / Tgl Lahir', 'Tempat/Tgl.Lahir'],
'jenis_kelamin': ['Jenis Kelamin', 'Jenis kelamin', 'JenisKelamin', 'JENIS KELAMIN'],
'golongan_darah': ['Gol. Darah', 'Gol.Darah', 'Gol Darah', 'GOL. DARAH'],
'alamat': ['Alamat', 'ALAMAT', 'Address'],
'rt_rw': ['RT/RW', 'RT / RW', 'RTRW'],
'kelurahan_desa': ['Kel/Desa', 'Kel / Desa', 'Kelurahan/Desa', 'KEL/DESA'],
'kecamatan': ['Kecamatan', 'KECAMATAN', 'Kec'],
'agama': ['Agama', 'AGAMA', 'Religion'],
'status_perkawinan': ['Status Perkawinan', 'Status perkawinan', 'STATUS PERKAWINAN'],
'pekerjaan': ['Pekerjaan', 'PEKERJAAN', 'Occupation'],
'kewarganegaraan': ['Kewarganegaraan', 'KEWARGANEGARAAN', 'Nationality'],
'berlaku_hingga': ['Berlaku Hingga', 'Berlaku hingga', 'BERLAKU HINGGA', 'Valid Until']
}
# Valid values for certain fields
self.valid_genders = ['LAKI-LAKI', 'PEREMPUAN']
self.valid_religions = ['ISLAM', 'KRISTEN', 'KATOLIK', 'HINDU', 'BUDDHA', 'KONGHUCU']
self.valid_marital_status = ['BELUM KAWIN', 'KAWIN', 'CERAI HIDUP', 'CERAI MATI']
self.valid_blood_types = ['A', 'B', 'AB', 'O', 'A+', 'A-', 'B+', 'B-', 'AB+', 'AB-', 'O+', 'O-', '-']
self.valid_nationalities = ['WNI', 'WNA', 'INDONESIA']
def initialize(self) -> None:
"""Initialize PaddleOCR reader."""
if self.initialized:
return
try:
from paddleocr import PaddleOCR
logger.info("Initializing PaddleOCR reader...")
self.reader = PaddleOCR(
lang='en', # Use English (includes Latin characters for Indonesian KTP)
)
self.initialized = True
logger.info("PaddleOCR reader initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize PaddleOCR: {e}")
raise
def preprocess_image(self, image: np.ndarray) -> np.ndarray:
"""
Preprocess KTP image for better OCR results.
Args:
image: Input image (BGR format)
Returns:
Preprocessed image
"""
# Convert to grayscale
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy()
# Apply CLAHE for contrast enhancement
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
# Denoise
denoised = cv2.fastNlMeansDenoising(enhanced, None, 10, 7, 21)
# Adaptive thresholding for better text contrast
binary = cv2.adaptiveThreshold(
denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
# Convert back to BGR for EasyOCR
result = cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)
return result
def extract_text(
self,
image: np.ndarray,
preprocess: bool = True
) -> List[Tuple[List[List[int]], str, float]]:
"""
Extract text from KTP image using PaddleOCR.
Args:
image: Input image (BGR format)
preprocess: Whether to preprocess the image
Returns:
List of (bounding_box, text, confidence) tuples
"""
if not self.initialized:
raise RuntimeError("KTP OCR service not initialized")
# Preprocess image if requested
if preprocess:
processed = self.preprocess_image(image)
else:
processed = image
# Run PaddleOCR
result = self.reader.ocr(processed)
# Convert PaddleOCR format to expected format
# New PaddleOCR returns: [{'rec_texts': [...], 'rec_scores': [...], 'rec_polys': [...]}]
results = []
if result and len(result) > 0:
ocr_result = result[0]
texts = ocr_result.get('rec_texts', [])
scores = ocr_result.get('rec_scores', [])
polys = ocr_result.get('rec_polys', [])
for i, text in enumerate(texts):
bbox = polys[i].tolist() if i < len(polys) else []
confidence = scores[i] if i < len(scores) else 0.0
results.append((bbox, text, confidence))
# Also try on original image and merge results
if preprocess:
original_result = self.reader.ocr(image)
original_results = []
if original_result and len(original_result) > 0:
ocr_result = original_result[0]
texts = ocr_result.get('rec_texts', [])
scores = ocr_result.get('rec_scores', [])
polys = ocr_result.get('rec_polys', [])
for i, text in enumerate(texts):
bbox = polys[i].tolist() if i < len(polys) else []
confidence = scores[i] if i < len(scores) else 0.0
original_results.append((bbox, text, confidence))
# Merge results, preferring higher confidence
results = self._merge_ocr_results(results, original_results)
return results
def _merge_ocr_results(
self,
results1: List[Tuple],
results2: List[Tuple]
) -> List[Tuple]:
"""Merge OCR results from two runs, keeping higher confidence."""
all_results = results1 + results2
# Group by similar text and keep highest confidence
text_map = {}
for bbox, text, conf in all_results:
normalized_text = text.upper().strip()
if normalized_text not in text_map or text_map[normalized_text][2] < conf:
text_map[normalized_text] = (bbox, text, conf)
return list(text_map.values())
def parse_ktp_data(
self,
ocr_results: List[Tuple[List[List[int]], str, float]]
) -> KTPData:
"""
Parse OCR results into structured KTP data.
Args:
ocr_results: List of (bounding_box, text, confidence) tuples
Returns:
Structured KTP data
"""
ktp_data = KTPData()
# Sort results by vertical position (y-coordinate)
sorted_results = sorted(ocr_results, key=lambda x: x[0][0][1] if x[0] else 0)
# Extract all text lines
lines = [(text.strip(), conf) for _, text, conf in sorted_results if text.strip()]
# Join all text for regex-based extraction
full_text = ' '.join([line[0] for line in lines])
# Extract NIK (16 digits)
ktp_data.nik = self._extract_nik(lines, full_text)
# Extract province and city from header
ktp_data.provinsi, ktp_data.kabupaten_kota = self._extract_location(lines)
# Extract other fields
ktp_data.nama = self._extract_field_value(lines, full_text, 'nama')
# Extract birth place and date
birth_info = self._extract_birth_info(lines, full_text)
ktp_data.tempat_lahir = birth_info[0]
ktp_data.tanggal_lahir = birth_info[1]
ktp_data.jenis_kelamin = self._extract_gender(lines, full_text)
ktp_data.golongan_darah = self._extract_blood_type(lines, full_text)
ktp_data.alamat = self._extract_address(lines, full_text)
ktp_data.rt_rw = self._extract_rt_rw(lines, full_text)
ktp_data.kelurahan_desa = self._extract_field_value(lines, full_text, 'kelurahan_desa')
ktp_data.kecamatan = self._extract_field_value(lines, full_text, 'kecamatan')
ktp_data.agama = self._extract_religion(lines, full_text)
ktp_data.status_perkawinan = self._extract_marital_status(lines, full_text)
ktp_data.pekerjaan = self._extract_field_value(lines, full_text, 'pekerjaan')
ktp_data.kewarganegaraan = self._extract_nationality(lines, full_text)
ktp_data.berlaku_hingga = self._extract_validity(lines, full_text)
return ktp_data
def _extract_nik(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract NIK (16-digit ID number)."""
# Pattern for NIK: 16 consecutive digits
nik_pattern = r'\b(\d{16})\b'
for line_text, conf in lines:
# Clean the text
cleaned = re.sub(r'[^\d]', '', line_text)
if len(cleaned) == 16:
return KTPField(
value=cleaned,
confidence=conf,
raw_value=line_text
)
# Try from full text
match = re.search(nik_pattern, re.sub(r'\s', '', full_text))
if match:
return KTPField(
value=match.group(1),
confidence=0.7, # Lower confidence for pattern match
raw_value=match.group(1)
)
return None
def _extract_location(
self,
lines: List[Tuple[str, float]]
) -> Tuple[Optional[KTPField], Optional[KTPField]]:
"""Extract province and city from KTP header."""
provinsi = None
kab_kota = None
for i, (line_text, conf) in enumerate(lines[:5]): # Check first 5 lines
upper_text = line_text.upper()
# Look for "PROVINSI" keyword
if 'PROVINSI' in upper_text:
# Extract province name
prov_match = re.search(r'PROVINSI\s*[:\.]?\s*(.+)', upper_text)
if prov_match:
provinsi = KTPField(
value=self._sanitize_text(prov_match.group(1)),
confidence=conf,
raw_value=line_text
)
elif i + 1 < len(lines):
# Province name might be on next line
provinsi = KTPField(
value=self._sanitize_text(lines[i + 1][0]),
confidence=lines[i + 1][1],
raw_value=lines[i + 1][0]
)
# Look for "KABUPATEN" or "KOTA"
if 'KABUPATEN' in upper_text or 'KOTA' in upper_text:
kab_match = re.search(r'(KABUPATEN|KOTA)\s*[:\.]?\s*(.+)', upper_text)
if kab_match:
kab_kota = KTPField(
value=self._sanitize_text(kab_match.group(0)),
confidence=conf,
raw_value=line_text
)
return provinsi, kab_kota
def _extract_birth_info(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Tuple[Optional[KTPField], Optional[KTPField]]:
"""Extract birth place and date."""
tempat_lahir = None
tanggal_lahir = None
# Date pattern: DD-MM-YYYY or DD/MM/YYYY
date_pattern = r'(\d{2}[-/]\d{2}[-/]\d{4})'
for line_text, conf in lines:
upper_text = line_text.upper()
# Look for birth info line
if any(label.upper() in upper_text for label in self.field_labels.get('tempat_tanggal_lahir', [])):
# Extract after the label
for label in self.field_labels['tempat_tanggal_lahir']:
if label.upper() in upper_text:
rest = upper_text.split(label.upper())[-1].strip()
rest = re.sub(r'^[:\s]+', '', rest)
# Find date in the rest
date_match = re.search(date_pattern, rest)
if date_match:
date_str = date_match.group(1)
place = rest[:date_match.start()].strip().rstrip(',')
tempat_lahir = KTPField(
value=self._sanitize_text(place),
confidence=conf,
raw_value=place
)
tanggal_lahir = KTPField(
value=self._sanitize_date(date_str),
confidence=conf,
raw_value=date_str
)
break
# Also check for standalone date
if not tanggal_lahir:
date_match = re.search(date_pattern, line_text)
if date_match:
tanggal_lahir = KTPField(
value=self._sanitize_date(date_match.group(1)),
confidence=conf,
raw_value=date_match.group(1)
)
return tempat_lahir, tanggal_lahir
def _extract_gender(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract gender (Jenis Kelamin)."""
for line_text, conf in lines:
upper_text = line_text.upper()
for valid_gender in self.valid_genders:
if valid_gender in upper_text:
return KTPField(
value=valid_gender,
confidence=conf,
raw_value=line_text
)
return None
def _extract_blood_type(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract blood type (Golongan Darah)."""
for line_text, conf in lines:
upper_text = line_text.upper()
# Look for blood type field
if any(label.upper() in upper_text for label in self.field_labels.get('golongan_darah', [])):
for blood_type in self.valid_blood_types:
if blood_type in upper_text:
return KTPField(
value=blood_type,
confidence=conf,
raw_value=line_text
)
return None
def _extract_address(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract address (Alamat)."""
for i, (line_text, conf) in enumerate(lines):
upper_text = line_text.upper()
if any(label.upper() in upper_text for label in self.field_labels.get('alamat', [])):
# Get the address part after the label
for label in self.field_labels['alamat']:
if label.upper() in upper_text:
rest = upper_text.split(label.upper())[-1].strip()
rest = re.sub(r'^[:\s]+', '', rest)
if rest:
return KTPField(
value=self._sanitize_text(rest),
confidence=conf,
raw_value=line_text
)
# Address might be on next line
elif i + 1 < len(lines):
next_line = lines[i + 1]
return KTPField(
value=self._sanitize_text(next_line[0]),
confidence=next_line[1],
raw_value=next_line[0]
)
return None
def _extract_rt_rw(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract RT/RW."""
rt_rw_pattern = r'(\d{3})\s*/\s*(\d{3})'
for line_text, conf in lines:
match = re.search(rt_rw_pattern, line_text)
if match:
value = f"{match.group(1)}/{match.group(2)}"
return KTPField(
value=value,
confidence=conf,
raw_value=line_text
)
return None
def _extract_religion(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract religion (Agama)."""
for line_text, conf in lines:
upper_text = line_text.upper()
for religion in self.valid_religions:
if religion in upper_text:
return KTPField(
value=religion,
confidence=conf,
raw_value=line_text
)
return None
def _extract_marital_status(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract marital status (Status Perkawinan)."""
for line_text, conf in lines:
upper_text = line_text.upper()
for status in self.valid_marital_status:
if status in upper_text:
return KTPField(
value=status,
confidence=conf,
raw_value=line_text
)
return None
def _extract_nationality(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract nationality (Kewarganegaraan)."""
for line_text, conf in lines:
upper_text = line_text.upper()
for nationality in self.valid_nationalities:
if nationality in upper_text:
return KTPField(
value=nationality if nationality != 'INDONESIA' else 'WNI',
confidence=conf,
raw_value=line_text
)
return None
def _extract_validity(
self,
lines: List[Tuple[str, float]],
full_text: str
) -> Optional[KTPField]:
"""Extract validity period (Berlaku Hingga)."""
for line_text, conf in lines:
upper_text = line_text.upper()
if any(label.upper() in upper_text for label in self.field_labels.get('berlaku_hingga', [])):
# Check for "SEUMUR HIDUP"
if 'SEUMUR HIDUP' in upper_text:
return KTPField(
value='SEUMUR HIDUP',
confidence=conf,
raw_value=line_text
)
# Check for date
date_pattern = r'(\d{2}[-/]\d{2}[-/]\d{4})'
date_match = re.search(date_pattern, line_text)
if date_match:
return KTPField(
value=self._sanitize_date(date_match.group(1)),
confidence=conf,
raw_value=line_text
)
return None
def _extract_field_value(
self,
lines: List[Tuple[str, float]],
full_text: str,
field_name: str
) -> Optional[KTPField]:
"""Generic field value extraction."""
labels = self.field_labels.get(field_name, [])
for i, (line_text, conf) in enumerate(lines):
for label in labels:
if label.upper() in line_text.upper():
# Get value after label
rest = line_text.upper().split(label.upper())[-1].strip()
rest = re.sub(r'^[:\s]+', '', rest)
if rest:
return KTPField(
value=self._sanitize_text(rest),
confidence=conf,
raw_value=line_text
)
# Value might be on next line
elif i + 1 < len(lines):
next_line = lines[i + 1]
return KTPField(
value=self._sanitize_text(next_line[0]),
confidence=next_line[1],
raw_value=next_line[0]
)
return None
def _sanitize_text(self, text: str) -> str:
"""Sanitize extracted text."""
if not text:
return ""
# Remove extra whitespace
text = ' '.join(text.split())
# Remove leading/trailing punctuation
text = text.strip('.:,;-_')
# Convert to title case for names
text = text.strip()
return text
def _sanitize_date(self, date_str: str) -> str:
"""Sanitize and standardize date format to DD-MM-YYYY."""
if not date_str:
return ""
# Replace / with -
date_str = date_str.replace('/', '-')
return date_str
def validate_nik(self, nik: str) -> Dict[str, Any]:
"""
Validate NIK and extract encoded information.
NIK Format: PPKKCC-DDMMYY-XXXX
- PP: Province code (2 digits)
- KK: City/Regency code (2 digits)
- CC: District code (2 digits)
- DD: Birth date (01-31, add 40 for females)
- MM: Birth month (01-12)
- YY: Birth year (last 2 digits)
- XXXX: Sequence number (4 digits)
Args:
nik: NIK string (16 digits)
Returns:
Validation result with extracted info
"""
result = {
'is_valid': False,
'errors': [],
'extracted': {}
}
# Clean NIK
nik = re.sub(r'[^\d]', '', nik)
# Check length
if len(nik) != 16:
result['errors'].append(f"Invalid length: {len(nik)} (expected 16)")
return result
try:
# Extract components
province_code = nik[0:2]
city_code = nik[2:4]
district_code = nik[4:6]
birth_day = int(nik[6:8])
birth_month = int(nik[8:10])
birth_year = int(nik[10:12])
sequence = nik[12:16]
# Determine gender from birth day
gender = 'PEREMPUAN' if birth_day > 40 else 'LAKI-LAKI'
actual_day = birth_day - 40 if birth_day > 40 else birth_day
# Validate birth date
if actual_day < 1 or actual_day > 31:
result['errors'].append(f"Invalid birth day: {actual_day}")
if birth_month < 1 or birth_month > 12:
result['errors'].append(f"Invalid birth month: {birth_month}")
# Determine full birth year (assume 19xx for > 30, 20xx for <= 30)
current_year = datetime.now().year % 100
if birth_year > current_year:
full_year = 1900 + birth_year
else:
full_year = 2000 + birth_year
result['extracted'] = {
'province_code': province_code,
'city_code': city_code,
'district_code': district_code,
'birth_date': f"{actual_day:02d}-{birth_month:02d}-{full_year}",
'gender': gender,
'sequence': sequence
}
result['is_valid'] = len(result['errors']) == 0
except Exception as e:
result['errors'].append(f"Parsing error: {str(e)}")
return result
def extract_ktp_data(
self,
image: np.ndarray,
validate: bool = True
) -> Dict[str, Any]:
"""
Extract and parse all KTP data from image.
Args:
image: Input image (BGR format)
validate: Whether to validate extracted data
Returns:
Dictionary with extracted data, raw OCR results, and validation
"""
# Run OCR
ocr_results = self.extract_text(image)
# Parse into structured data
ktp_data = self.parse_ktp_data(ocr_results)
# Build response
response = {
'data': ktp_data.to_dict(),
'raw_text': [
{
'text': text,
'confidence': conf,
'bbox': bbox
}
for bbox, text, conf in ocr_results
],
'validation': None
}
# Validate NIK if found and validation requested
if validate and ktp_data.nik:
response['validation'] = {
'nik': self.validate_nik(ktp_data.nik.value)
}
return response
# Global service instance
ktp_ocr_service = KTPOCRService()