ocr-ktp / app.py
icanq's picture
Create app.py
7f9940d verified
from __future__ import annotations
import logging
import os
import re
import sys
import tempfile
from dataclasses import dataclass, field, fields
from enum import Enum
from pathlib import Path
from typing import Callable, Iterator, Optional, Protocol
import gradio as gr
import pandas as pd
import torch
from PIL import Image
from transformers import AutoModelForImageTextToText, AutoProcessor
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger(__name__)
# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘ DOMAIN MODELS โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class ExtractionStatus(Enum):
SUCCESS = "success"
PARTIAL = "partial"
FAILED = "failed"
@dataclass(frozen=True, slots=True)
class KTPData:
"""Immutable value object โ€” extracted KTP fields."""
nik: Optional[str] = None
nama: Optional[str] = None
tempat_lahir: Optional[str] = None
tanggal_lahir: Optional[str] = None
@property
def status(self) -> ExtractionStatus:
populated = sum(1 for f in fields(self) if getattr(self, f.name) is not None)
if populated == len(fields(self)):
return ExtractionStatus.SUCCESS
return ExtractionStatus.PARTIAL if populated > 0 else ExtractionStatus.FAILED
def to_dict(self) -> dict[str, Optional[str]]:
labels = {
"nik": "NIK",
"nama": "Nama",
"tempat_lahir": "Tempat Lahir",
"tanggal_lahir": "Tanggal Lahir",
}
return {labels[f.name]: getattr(self, f.name) for f in fields(self)}
@dataclass(frozen=True, slots=True)
class ExtractionResult:
"""Result of processing a single image."""
filename: str
data: KTPData
raw_text: str = ""
error: Optional[str] = None
def to_row(self) -> dict:
return {"Filename": self.filename, **self.data.to_dict(), "Status": self.data.status.value}
# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘ PARSER โ€” pure functions, no I/O, no model dependency โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
_NIK = re.compile(r"\b(\d{16})\b")
_DATE = re.compile(r"(\d{2}[-/]\d{2}[-/]\d{4})")
_NAMA_PATTERNS: list[re.Pattern] = [
re.compile(
r"(?:Nama|NAMA)\s*[:/]?\s*([A-Z][A-Z\s'.]{2,}?)"
r"(?=\s+(?:WNI|WNA|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/])|$)",
re.IGNORECASE,
),
re.compile(
r"\b\d{16}\b\s+([A-Z][A-Z\s'.]{2,}?)"
r"(?=\s+(?:WNI|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/]))",
re.IGNORECASE,
),
]
_TEMPAT_PATTERNS: list[re.Pattern] = [
re.compile(
r"(?:Tempat\s*/?\s*Tgl\s*Lahir|TTL)\s*[:/]?\s*([A-Z][A-Za-z\s]+?)(?=\s*[,]?\s*\d{2}[-/])",
re.IGNORECASE,
),
re.compile(r"([A-Z][A-Z\s]{2,}?)\s*[,]?\s*\d{2}[-/]\d{2}[-/]\d{4}"),
]
def _first_match(patterns: list[re.Pattern], text: str, group: int = 1) -> Optional[str]:
for p in patterns:
m = p.search(text)
if m:
return m.group(group).strip().rstrip(",.")
return None
def parse_ktp(raw_text: str) -> KTPData:
"""Parse raw OCR text into structured KTP data. Pure, deterministic, testable."""
text = " ".join(raw_text.split())
nik = _NIK.search(text)
date = _DATE.search(text)
return KTPData(
nik=nik.group(1) if nik else None,
nama=_first_match(_NAMA_PATTERNS, text),
tempat_lahir=_first_match(_TEMPAT_PATTERNS, text),
tanggal_lahir=date.group(1).replace("/", "-") if date else None,
)
# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘ OCR ENGINE โ€” owns model lifecycle and inference โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
class OCREngine(Protocol):
def recognize(self, image: Image.Image) -> str: ...
@dataclass
class ModelConfig:
model_path: str = "emisilab/model-ocr-ktp-v1"
max_length: int = 1024
use_fp16: bool = True
class HuggingFaceOCR:
"""Lazy-loading HF vision-language OCR engine."""
def __init__(self, config: ModelConfig | None = None) -> None:
self._cfg = config or ModelConfig()
self._device = "cuda" if torch.cuda.is_available() else "cpu"
self._dtype = torch.float16 if (self._cfg.use_fp16 and self._device == "cuda") else torch.float32
self._processor: AutoProcessor | None = None
self._model: AutoModelForImageTextToText | None = None
def _ensure_loaded(self) -> None:
if self._model is not None:
return
logger.info("Loading %s on %s (%s)", self._cfg.model_path, self._device, self._dtype)
self._processor = AutoProcessor.from_pretrained(self._cfg.model_path, use_fast=True)
self._model = (
AutoModelForImageTextToText.from_pretrained(self._cfg.model_path, torch_dtype=self._dtype)
.to(self._device)
.eval()
)
logger.info("Model ready.")
@property
def is_available(self) -> bool:
try:
self._ensure_loaded()
return True
except Exception:
logger.exception("Model unavailable")
return False
@torch.inference_mode()
def recognize(self, image: Image.Image) -> str:
self._ensure_loaded()
assert self._processor and self._model
px = self._processor(images=image, return_tensors="pt").pixel_values.to(
device=self._device, dtype=self._dtype
)
ids = self._model.generate(px, max_length=self._cfg.max_length)
return self._processor.batch_decode(ids, skip_special_tokens=True)[0]
# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘ PIPELINE โ€” composes engine + parser โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
COLUMNS = ["Filename", "NIK", "Nama", "Tempat Lahir", "Tanggal Lahir", "Status"]
ProgressCallback = Optional[Callable[[float, str], None]]
class ExtractionPipeline:
def __init__(self, engine: OCREngine) -> None:
self._engine = engine
def process_one(self, path: Path) -> ExtractionResult:
try:
image = Image.open(path).convert("RGB")
raw = self._engine.recognize(image)
return ExtractionResult(filename=path.name, data=parse_ktp(raw), raw_text=raw)
except Exception as e:
logger.exception("Failed: %s", path.name)
return ExtractionResult(filename=path.name, data=KTPData(), error=str(e))
def process_batch(self, paths: list[Path], on_progress: ProgressCallback = None) -> pd.DataFrame:
rows = []
for i, p in enumerate(paths, 1):
if on_progress:
on_progress(i / len(paths), f"Processing {p.name} ({i}/{len(paths)})")
rows.append(self.process_one(p).to_row())
return pd.DataFrame(rows, columns=COLUMNS) if rows else pd.DataFrame(columns=COLUMNS)
# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘ GRADIO UI โ€” thin presentation layer โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
engine = HuggingFaceOCR()
pipeline = ExtractionPipeline(engine)
def on_extract(files: list[str] | None, progress: gr.Progress = gr.Progress()):
if not files:
return pd.DataFrame(columns=COLUMNS), None
if not engine.is_available:
raise gr.Error("Model failed to load โ€” check Space logs.")
df = pipeline.process_batch(
[Path(f) for f in files],
on_progress=lambda frac, msg: progress(frac, desc=msg),
)
csv_path = Path(tempfile.gettempdir()) / "ktp_results.csv"
df.to_csv(csv_path, index=False)
return df, str(csv_path)
def on_preview(files: list[str] | None):
return [Image.open(f) for f in files] if files else []
with gr.Blocks(theme=gr.themes.Soft(), title="KTP OCR Extractor") as demo:
gr.Markdown(
"# KTP OCR Extractor ๐Ÿ‡ฎ๐Ÿ‡ฉ\n"
"Upload KTP images โ†’ extract **NIK, Nama, Tempat Lahir, Tanggal Lahir** automatically."
)
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload KTP Images",
file_count="multiple",
file_types=["image"],
type="filepath",
)
gallery = gr.Gallery(label="Preview", columns=3, height=200)
extract_btn = gr.Button("Extract", variant="primary", size="lg")
with gr.Column(scale=2):
result_table = gr.DataFrame(label="Results", headers=COLUMNS)
csv_download = gr.File(label="Download CSV")
file_input.change(on_preview, file_input, gallery)
extract_btn.click(on_extract, file_input, [result_table, csv_download])
if __name__ == "__main__":
demo.launch()