| from __future__ import annotations |
|
|
| import logging |
| import os |
| import re |
| import sys |
| import tempfile |
| from dataclasses import dataclass, field, fields |
| from enum import Enum |
| from pathlib import Path |
| from typing import Callable, Iterator, Optional, Protocol |
|
|
| import gradio as gr |
| import pandas as pd |
| import torch |
| from PIL import Image |
| from transformers import AutoModelForImageTextToText, AutoProcessor |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| stream=sys.stderr, |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class ExtractionStatus(Enum): |
| SUCCESS = "success" |
| PARTIAL = "partial" |
| FAILED = "failed" |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class KTPData: |
| """Immutable value object โ extracted KTP fields.""" |
|
|
| nik: Optional[str] = None |
| nama: Optional[str] = None |
| tempat_lahir: Optional[str] = None |
| tanggal_lahir: Optional[str] = None |
|
|
| @property |
| def status(self) -> ExtractionStatus: |
| populated = sum(1 for f in fields(self) if getattr(self, f.name) is not None) |
| if populated == len(fields(self)): |
| return ExtractionStatus.SUCCESS |
| return ExtractionStatus.PARTIAL if populated > 0 else ExtractionStatus.FAILED |
|
|
| def to_dict(self) -> dict[str, Optional[str]]: |
| labels = { |
| "nik": "NIK", |
| "nama": "Nama", |
| "tempat_lahir": "Tempat Lahir", |
| "tanggal_lahir": "Tanggal Lahir", |
| } |
| return {labels[f.name]: getattr(self, f.name) for f in fields(self)} |
|
|
|
|
| @dataclass(frozen=True, slots=True) |
| class ExtractionResult: |
| """Result of processing a single image.""" |
|
|
| filename: str |
| data: KTPData |
| raw_text: str = "" |
| error: Optional[str] = None |
|
|
| def to_row(self) -> dict: |
| return {"Filename": self.filename, **self.data.to_dict(), "Status": self.data.status.value} |
|
|
|
|
| |
| |
| |
|
|
| _NIK = re.compile(r"\b(\d{16})\b") |
| _DATE = re.compile(r"(\d{2}[-/]\d{2}[-/]\d{4})") |
|
|
| _NAMA_PATTERNS: list[re.Pattern] = [ |
| re.compile( |
| r"(?:Nama|NAMA)\s*[:/]?\s*([A-Z][A-Z\s'.]{2,}?)" |
| r"(?=\s+(?:WNI|WNA|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/])|$)", |
| re.IGNORECASE, |
| ), |
| re.compile( |
| r"\b\d{16}\b\s+([A-Z][A-Z\s'.]{2,}?)" |
| r"(?=\s+(?:WNI|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/]))", |
| re.IGNORECASE, |
| ), |
| ] |
|
|
| _TEMPAT_PATTERNS: list[re.Pattern] = [ |
| re.compile( |
| r"(?:Tempat\s*/?\s*Tgl\s*Lahir|TTL)\s*[:/]?\s*([A-Z][A-Za-z\s]+?)(?=\s*[,]?\s*\d{2}[-/])", |
| re.IGNORECASE, |
| ), |
| re.compile(r"([A-Z][A-Z\s]{2,}?)\s*[,]?\s*\d{2}[-/]\d{2}[-/]\d{4}"), |
| ] |
|
|
|
|
| def _first_match(patterns: list[re.Pattern], text: str, group: int = 1) -> Optional[str]: |
| for p in patterns: |
| m = p.search(text) |
| if m: |
| return m.group(group).strip().rstrip(",.") |
| return None |
|
|
|
|
| def parse_ktp(raw_text: str) -> KTPData: |
| """Parse raw OCR text into structured KTP data. Pure, deterministic, testable.""" |
| text = " ".join(raw_text.split()) |
| nik = _NIK.search(text) |
| date = _DATE.search(text) |
| return KTPData( |
| nik=nik.group(1) if nik else None, |
| nama=_first_match(_NAMA_PATTERNS, text), |
| tempat_lahir=_first_match(_TEMPAT_PATTERNS, text), |
| tanggal_lahir=date.group(1).replace("/", "-") if date else None, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class OCREngine(Protocol): |
| def recognize(self, image: Image.Image) -> str: ... |
|
|
|
|
| @dataclass |
| class ModelConfig: |
| model_path: str = "emisilab/model-ocr-ktp-v1" |
| max_length: int = 1024 |
| use_fp16: bool = True |
|
|
|
|
| class HuggingFaceOCR: |
| """Lazy-loading HF vision-language OCR engine.""" |
|
|
| def __init__(self, config: ModelConfig | None = None) -> None: |
| self._cfg = config or ModelConfig() |
| self._device = "cuda" if torch.cuda.is_available() else "cpu" |
| self._dtype = torch.float16 if (self._cfg.use_fp16 and self._device == "cuda") else torch.float32 |
| self._processor: AutoProcessor | None = None |
| self._model: AutoModelForImageTextToText | None = None |
|
|
| def _ensure_loaded(self) -> None: |
| if self._model is not None: |
| return |
| logger.info("Loading %s on %s (%s)", self._cfg.model_path, self._device, self._dtype) |
| self._processor = AutoProcessor.from_pretrained(self._cfg.model_path, use_fast=True) |
| self._model = ( |
| AutoModelForImageTextToText.from_pretrained(self._cfg.model_path, torch_dtype=self._dtype) |
| .to(self._device) |
| .eval() |
| ) |
| logger.info("Model ready.") |
|
|
| @property |
| def is_available(self) -> bool: |
| try: |
| self._ensure_loaded() |
| return True |
| except Exception: |
| logger.exception("Model unavailable") |
| return False |
|
|
| @torch.inference_mode() |
| def recognize(self, image: Image.Image) -> str: |
| self._ensure_loaded() |
| assert self._processor and self._model |
| px = self._processor(images=image, return_tensors="pt").pixel_values.to( |
| device=self._device, dtype=self._dtype |
| ) |
| ids = self._model.generate(px, max_length=self._cfg.max_length) |
| return self._processor.batch_decode(ids, skip_special_tokens=True)[0] |
|
|
|
|
| |
| |
| |
|
|
| COLUMNS = ["Filename", "NIK", "Nama", "Tempat Lahir", "Tanggal Lahir", "Status"] |
| ProgressCallback = Optional[Callable[[float, str], None]] |
|
|
|
|
| class ExtractionPipeline: |
| def __init__(self, engine: OCREngine) -> None: |
| self._engine = engine |
|
|
| def process_one(self, path: Path) -> ExtractionResult: |
| try: |
| image = Image.open(path).convert("RGB") |
| raw = self._engine.recognize(image) |
| return ExtractionResult(filename=path.name, data=parse_ktp(raw), raw_text=raw) |
| except Exception as e: |
| logger.exception("Failed: %s", path.name) |
| return ExtractionResult(filename=path.name, data=KTPData(), error=str(e)) |
|
|
| def process_batch(self, paths: list[Path], on_progress: ProgressCallback = None) -> pd.DataFrame: |
| rows = [] |
| for i, p in enumerate(paths, 1): |
| if on_progress: |
| on_progress(i / len(paths), f"Processing {p.name} ({i}/{len(paths)})") |
| rows.append(self.process_one(p).to_row()) |
| return pd.DataFrame(rows, columns=COLUMNS) if rows else pd.DataFrame(columns=COLUMNS) |
|
|
|
|
| |
| |
| |
|
|
| engine = HuggingFaceOCR() |
| pipeline = ExtractionPipeline(engine) |
|
|
|
|
| def on_extract(files: list[str] | None, progress: gr.Progress = gr.Progress()): |
| if not files: |
| return pd.DataFrame(columns=COLUMNS), None |
| if not engine.is_available: |
| raise gr.Error("Model failed to load โ check Space logs.") |
|
|
| df = pipeline.process_batch( |
| [Path(f) for f in files], |
| on_progress=lambda frac, msg: progress(frac, desc=msg), |
| ) |
| csv_path = Path(tempfile.gettempdir()) / "ktp_results.csv" |
| df.to_csv(csv_path, index=False) |
| return df, str(csv_path) |
|
|
|
|
| def on_preview(files: list[str] | None): |
| return [Image.open(f) for f in files] if files else [] |
|
|
|
|
| with gr.Blocks(theme=gr.themes.Soft(), title="KTP OCR Extractor") as demo: |
| gr.Markdown( |
| "# KTP OCR Extractor ๐ฎ๐ฉ\n" |
| "Upload KTP images โ extract **NIK, Nama, Tempat Lahir, Tanggal Lahir** automatically." |
| ) |
| with gr.Row(): |
| with gr.Column(scale=1): |
| file_input = gr.File( |
| label="Upload KTP Images", |
| file_count="multiple", |
| file_types=["image"], |
| type="filepath", |
| ) |
| gallery = gr.Gallery(label="Preview", columns=3, height=200) |
| extract_btn = gr.Button("Extract", variant="primary", size="lg") |
|
|
| with gr.Column(scale=2): |
| result_table = gr.DataFrame(label="Results", headers=COLUMNS) |
| csv_download = gr.File(label="Download CSV") |
|
|
| file_input.change(on_preview, file_input, gallery) |
| extract_btn.click(on_extract, file_input, [result_table, csv_download]) |
|
|
| if __name__ == "__main__": |
| demo.launch() |