File size: 10,297 Bytes
7f9940d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
from __future__ import annotations

import logging
import os
import re
import sys
import tempfile
from dataclasses import dataclass, field, fields
from enum import Enum
from pathlib import Path
from typing import Callable, Iterator, Optional, Protocol

import gradio as gr
import pandas as pd
import torch
from PIL import Image
from transformers import AutoModelForImageTextToText, AutoProcessor

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    stream=sys.stderr,
)
logger = logging.getLogger(__name__)


# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘  DOMAIN MODELS                                              โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•


class ExtractionStatus(Enum):
    SUCCESS = "success"
    PARTIAL = "partial"
    FAILED = "failed"


@dataclass(frozen=True, slots=True)
class KTPData:
    """Immutable value object โ€” extracted KTP fields."""

    nik: Optional[str] = None
    nama: Optional[str] = None
    tempat_lahir: Optional[str] = None
    tanggal_lahir: Optional[str] = None

    @property
    def status(self) -> ExtractionStatus:
        populated = sum(1 for f in fields(self) if getattr(self, f.name) is not None)
        if populated == len(fields(self)):
            return ExtractionStatus.SUCCESS
        return ExtractionStatus.PARTIAL if populated > 0 else ExtractionStatus.FAILED

    def to_dict(self) -> dict[str, Optional[str]]:
        labels = {
            "nik": "NIK",
            "nama": "Nama",
            "tempat_lahir": "Tempat Lahir",
            "tanggal_lahir": "Tanggal Lahir",
        }
        return {labels[f.name]: getattr(self, f.name) for f in fields(self)}


@dataclass(frozen=True, slots=True)
class ExtractionResult:
    """Result of processing a single image."""

    filename: str
    data: KTPData
    raw_text: str = ""
    error: Optional[str] = None

    def to_row(self) -> dict:
        return {"Filename": self.filename, **self.data.to_dict(), "Status": self.data.status.value}


# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘  PARSER โ€” pure functions, no I/O, no model dependency       โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

_NIK = re.compile(r"\b(\d{16})\b")
_DATE = re.compile(r"(\d{2}[-/]\d{2}[-/]\d{4})")

_NAMA_PATTERNS: list[re.Pattern] = [
    re.compile(
        r"(?:Nama|NAMA)\s*[:/]?\s*([A-Z][A-Z\s'.]{2,}?)"
        r"(?=\s+(?:WNI|WNA|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/])|$)",
        re.IGNORECASE,
    ),
    re.compile(
        r"\b\d{16}\b\s+([A-Z][A-Z\s'.]{2,}?)"
        r"(?=\s+(?:WNI|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/]))",
        re.IGNORECASE,
    ),
]

_TEMPAT_PATTERNS: list[re.Pattern] = [
    re.compile(
        r"(?:Tempat\s*/?\s*Tgl\s*Lahir|TTL)\s*[:/]?\s*([A-Z][A-Za-z\s]+?)(?=\s*[,]?\s*\d{2}[-/])",
        re.IGNORECASE,
    ),
    re.compile(r"([A-Z][A-Z\s]{2,}?)\s*[,]?\s*\d{2}[-/]\d{2}[-/]\d{4}"),
]


def _first_match(patterns: list[re.Pattern], text: str, group: int = 1) -> Optional[str]:
    for p in patterns:
        m = p.search(text)
        if m:
            return m.group(group).strip().rstrip(",.")
    return None


def parse_ktp(raw_text: str) -> KTPData:
    """Parse raw OCR text into structured KTP data. Pure, deterministic, testable."""
    text = " ".join(raw_text.split())
    nik = _NIK.search(text)
    date = _DATE.search(text)
    return KTPData(
        nik=nik.group(1) if nik else None,
        nama=_first_match(_NAMA_PATTERNS, text),
        tempat_lahir=_first_match(_TEMPAT_PATTERNS, text),
        tanggal_lahir=date.group(1).replace("/", "-") if date else None,
    )


# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘  OCR ENGINE โ€” owns model lifecycle and inference            โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•


class OCREngine(Protocol):
    def recognize(self, image: Image.Image) -> str: ...


@dataclass
class ModelConfig:
    model_path: str = "emisilab/model-ocr-ktp-v1"
    max_length: int = 1024
    use_fp16: bool = True


class HuggingFaceOCR:
    """Lazy-loading HF vision-language OCR engine."""

    def __init__(self, config: ModelConfig | None = None) -> None:
        self._cfg = config or ModelConfig()
        self._device = "cuda" if torch.cuda.is_available() else "cpu"
        self._dtype = torch.float16 if (self._cfg.use_fp16 and self._device == "cuda") else torch.float32
        self._processor: AutoProcessor | None = None
        self._model: AutoModelForImageTextToText | None = None

    def _ensure_loaded(self) -> None:
        if self._model is not None:
            return
        logger.info("Loading %s on %s (%s)", self._cfg.model_path, self._device, self._dtype)
        self._processor = AutoProcessor.from_pretrained(self._cfg.model_path, use_fast=True)
        self._model = (
            AutoModelForImageTextToText.from_pretrained(self._cfg.model_path, torch_dtype=self._dtype)
            .to(self._device)
            .eval()
        )
        logger.info("Model ready.")

    @property
    def is_available(self) -> bool:
        try:
            self._ensure_loaded()
            return True
        except Exception:
            logger.exception("Model unavailable")
            return False

    @torch.inference_mode()
    def recognize(self, image: Image.Image) -> str:
        self._ensure_loaded()
        assert self._processor and self._model
        px = self._processor(images=image, return_tensors="pt").pixel_values.to(
            device=self._device, dtype=self._dtype
        )
        ids = self._model.generate(px, max_length=self._cfg.max_length)
        return self._processor.batch_decode(ids, skip_special_tokens=True)[0]


# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘  PIPELINE โ€” composes engine + parser                        โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

COLUMNS = ["Filename", "NIK", "Nama", "Tempat Lahir", "Tanggal Lahir", "Status"]
ProgressCallback = Optional[Callable[[float, str], None]]


class ExtractionPipeline:
    def __init__(self, engine: OCREngine) -> None:
        self._engine = engine

    def process_one(self, path: Path) -> ExtractionResult:
        try:
            image = Image.open(path).convert("RGB")
            raw = self._engine.recognize(image)
            return ExtractionResult(filename=path.name, data=parse_ktp(raw), raw_text=raw)
        except Exception as e:
            logger.exception("Failed: %s", path.name)
            return ExtractionResult(filename=path.name, data=KTPData(), error=str(e))

    def process_batch(self, paths: list[Path], on_progress: ProgressCallback = None) -> pd.DataFrame:
        rows = []
        for i, p in enumerate(paths, 1):
            if on_progress:
                on_progress(i / len(paths), f"Processing {p.name} ({i}/{len(paths)})")
            rows.append(self.process_one(p).to_row())
        return pd.DataFrame(rows, columns=COLUMNS) if rows else pd.DataFrame(columns=COLUMNS)


# โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
# โ•‘  GRADIO UI โ€” thin presentation layer                        โ•‘
# โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•

engine = HuggingFaceOCR()
pipeline = ExtractionPipeline(engine)


def on_extract(files: list[str] | None, progress: gr.Progress = gr.Progress()):
    if not files:
        return pd.DataFrame(columns=COLUMNS), None
    if not engine.is_available:
        raise gr.Error("Model failed to load โ€” check Space logs.")

    df = pipeline.process_batch(
        [Path(f) for f in files],
        on_progress=lambda frac, msg: progress(frac, desc=msg),
    )
    csv_path = Path(tempfile.gettempdir()) / "ktp_results.csv"
    df.to_csv(csv_path, index=False)
    return df, str(csv_path)


def on_preview(files: list[str] | None):
    return [Image.open(f) for f in files] if files else []


with gr.Blocks(theme=gr.themes.Soft(), title="KTP OCR Extractor") as demo:
    gr.Markdown(
        "# KTP OCR Extractor ๐Ÿ‡ฎ๐Ÿ‡ฉ\n"
        "Upload KTP images โ†’ extract **NIK, Nama, Tempat Lahir, Tanggal Lahir** automatically."
    )
    with gr.Row():
        with gr.Column(scale=1):
            file_input = gr.File(
                label="Upload KTP Images",
                file_count="multiple",
                file_types=["image"],
                type="filepath",
            )
            gallery = gr.Gallery(label="Preview", columns=3, height=200)
            extract_btn = gr.Button("Extract", variant="primary", size="lg")

        with gr.Column(scale=2):
            result_table = gr.DataFrame(label="Results", headers=COLUMNS)
            csv_download = gr.File(label="Download CSV")

    file_input.change(on_preview, file_input, gallery)
    extract_btn.click(on_extract, file_input, [result_table, csv_download])

if __name__ == "__main__":
    demo.launch()