icanq commited on
Commit
7f9940d
ยท
verified ยท
1 Parent(s): 7a404f9

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +266 -0
app.py ADDED
@@ -0,0 +1,266 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import os
5
+ import re
6
+ import sys
7
+ import tempfile
8
+ from dataclasses import dataclass, field, fields
9
+ from enum import Enum
10
+ from pathlib import Path
11
+ from typing import Callable, Iterator, Optional, Protocol
12
+
13
+ import gradio as gr
14
+ import pandas as pd
15
+ import torch
16
+ from PIL import Image
17
+ from transformers import AutoModelForImageTextToText, AutoProcessor
18
+
19
+ logging.basicConfig(
20
+ level=logging.INFO,
21
+ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
22
+ stream=sys.stderr,
23
+ )
24
+ logger = logging.getLogger(__name__)
25
+
26
+
27
+ # โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
28
+ # โ•‘ DOMAIN MODELS โ•‘
29
+ # โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
30
+
31
+
32
+ class ExtractionStatus(Enum):
33
+ SUCCESS = "success"
34
+ PARTIAL = "partial"
35
+ FAILED = "failed"
36
+
37
+
38
+ @dataclass(frozen=True, slots=True)
39
+ class KTPData:
40
+ """Immutable value object โ€” extracted KTP fields."""
41
+
42
+ nik: Optional[str] = None
43
+ nama: Optional[str] = None
44
+ tempat_lahir: Optional[str] = None
45
+ tanggal_lahir: Optional[str] = None
46
+
47
+ @property
48
+ def status(self) -> ExtractionStatus:
49
+ populated = sum(1 for f in fields(self) if getattr(self, f.name) is not None)
50
+ if populated == len(fields(self)):
51
+ return ExtractionStatus.SUCCESS
52
+ return ExtractionStatus.PARTIAL if populated > 0 else ExtractionStatus.FAILED
53
+
54
+ def to_dict(self) -> dict[str, Optional[str]]:
55
+ labels = {
56
+ "nik": "NIK",
57
+ "nama": "Nama",
58
+ "tempat_lahir": "Tempat Lahir",
59
+ "tanggal_lahir": "Tanggal Lahir",
60
+ }
61
+ return {labels[f.name]: getattr(self, f.name) for f in fields(self)}
62
+
63
+
64
+ @dataclass(frozen=True, slots=True)
65
+ class ExtractionResult:
66
+ """Result of processing a single image."""
67
+
68
+ filename: str
69
+ data: KTPData
70
+ raw_text: str = ""
71
+ error: Optional[str] = None
72
+
73
+ def to_row(self) -> dict:
74
+ return {"Filename": self.filename, **self.data.to_dict(), "Status": self.data.status.value}
75
+
76
+
77
+ # โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
78
+ # โ•‘ PARSER โ€” pure functions, no I/O, no model dependency โ•‘
79
+ # โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
80
+
81
+ _NIK = re.compile(r"\b(\d{16})\b")
82
+ _DATE = re.compile(r"(\d{2}[-/]\d{2}[-/]\d{4})")
83
+
84
+ _NAMA_PATTERNS: list[re.Pattern] = [
85
+ re.compile(
86
+ r"(?:Nama|NAMA)\s*[:/]?\s*([A-Z][A-Z\s'.]{2,}?)"
87
+ r"(?=\s+(?:WNI|WNA|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/])|$)",
88
+ re.IGNORECASE,
89
+ ),
90
+ re.compile(
91
+ r"\b\d{16}\b\s+([A-Z][A-Z\s'.]{2,}?)"
92
+ r"(?=\s+(?:WNI|ISLAM|KRISTEN|KATOLIK|HINDU|BUDHA|KONGHUCU|\d{2}[-/]))",
93
+ re.IGNORECASE,
94
+ ),
95
+ ]
96
+
97
+ _TEMPAT_PATTERNS: list[re.Pattern] = [
98
+ re.compile(
99
+ r"(?:Tempat\s*/?\s*Tgl\s*Lahir|TTL)\s*[:/]?\s*([A-Z][A-Za-z\s]+?)(?=\s*[,]?\s*\d{2}[-/])",
100
+ re.IGNORECASE,
101
+ ),
102
+ re.compile(r"([A-Z][A-Z\s]{2,}?)\s*[,]?\s*\d{2}[-/]\d{2}[-/]\d{4}"),
103
+ ]
104
+
105
+
106
+ def _first_match(patterns: list[re.Pattern], text: str, group: int = 1) -> Optional[str]:
107
+ for p in patterns:
108
+ m = p.search(text)
109
+ if m:
110
+ return m.group(group).strip().rstrip(",.")
111
+ return None
112
+
113
+
114
+ def parse_ktp(raw_text: str) -> KTPData:
115
+ """Parse raw OCR text into structured KTP data. Pure, deterministic, testable."""
116
+ text = " ".join(raw_text.split())
117
+ nik = _NIK.search(text)
118
+ date = _DATE.search(text)
119
+ return KTPData(
120
+ nik=nik.group(1) if nik else None,
121
+ nama=_first_match(_NAMA_PATTERNS, text),
122
+ tempat_lahir=_first_match(_TEMPAT_PATTERNS, text),
123
+ tanggal_lahir=date.group(1).replace("/", "-") if date else None,
124
+ )
125
+
126
+
127
+ # โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
128
+ # โ•‘ OCR ENGINE โ€” owns model lifecycle and inference โ•‘
129
+ # โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
130
+
131
+
132
+ class OCREngine(Protocol):
133
+ def recognize(self, image: Image.Image) -> str: ...
134
+
135
+
136
+ @dataclass
137
+ class ModelConfig:
138
+ model_path: str = "emisilab/model-ocr-ktp-v1"
139
+ max_length: int = 1024
140
+ use_fp16: bool = True
141
+
142
+
143
+ class HuggingFaceOCR:
144
+ """Lazy-loading HF vision-language OCR engine."""
145
+
146
+ def __init__(self, config: ModelConfig | None = None) -> None:
147
+ self._cfg = config or ModelConfig()
148
+ self._device = "cuda" if torch.cuda.is_available() else "cpu"
149
+ self._dtype = torch.float16 if (self._cfg.use_fp16 and self._device == "cuda") else torch.float32
150
+ self._processor: AutoProcessor | None = None
151
+ self._model: AutoModelForImageTextToText | None = None
152
+
153
+ def _ensure_loaded(self) -> None:
154
+ if self._model is not None:
155
+ return
156
+ logger.info("Loading %s on %s (%s)", self._cfg.model_path, self._device, self._dtype)
157
+ self._processor = AutoProcessor.from_pretrained(self._cfg.model_path, use_fast=True)
158
+ self._model = (
159
+ AutoModelForImageTextToText.from_pretrained(self._cfg.model_path, torch_dtype=self._dtype)
160
+ .to(self._device)
161
+ .eval()
162
+ )
163
+ logger.info("Model ready.")
164
+
165
+ @property
166
+ def is_available(self) -> bool:
167
+ try:
168
+ self._ensure_loaded()
169
+ return True
170
+ except Exception:
171
+ logger.exception("Model unavailable")
172
+ return False
173
+
174
+ @torch.inference_mode()
175
+ def recognize(self, image: Image.Image) -> str:
176
+ self._ensure_loaded()
177
+ assert self._processor and self._model
178
+ px = self._processor(images=image, return_tensors="pt").pixel_values.to(
179
+ device=self._device, dtype=self._dtype
180
+ )
181
+ ids = self._model.generate(px, max_length=self._cfg.max_length)
182
+ return self._processor.batch_decode(ids, skip_special_tokens=True)[0]
183
+
184
+
185
+ # โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
186
+ # โ•‘ PIPELINE โ€” composes engine + parser โ•‘
187
+ # โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
188
+
189
+ COLUMNS = ["Filename", "NIK", "Nama", "Tempat Lahir", "Tanggal Lahir", "Status"]
190
+ ProgressCallback = Optional[Callable[[float, str], None]]
191
+
192
+
193
+ class ExtractionPipeline:
194
+ def __init__(self, engine: OCREngine) -> None:
195
+ self._engine = engine
196
+
197
+ def process_one(self, path: Path) -> ExtractionResult:
198
+ try:
199
+ image = Image.open(path).convert("RGB")
200
+ raw = self._engine.recognize(image)
201
+ return ExtractionResult(filename=path.name, data=parse_ktp(raw), raw_text=raw)
202
+ except Exception as e:
203
+ logger.exception("Failed: %s", path.name)
204
+ return ExtractionResult(filename=path.name, data=KTPData(), error=str(e))
205
+
206
+ def process_batch(self, paths: list[Path], on_progress: ProgressCallback = None) -> pd.DataFrame:
207
+ rows = []
208
+ for i, p in enumerate(paths, 1):
209
+ if on_progress:
210
+ on_progress(i / len(paths), f"Processing {p.name} ({i}/{len(paths)})")
211
+ rows.append(self.process_one(p).to_row())
212
+ return pd.DataFrame(rows, columns=COLUMNS) if rows else pd.DataFrame(columns=COLUMNS)
213
+
214
+
215
+ # โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
216
+ # โ•‘ GRADIO UI โ€” thin presentation layer โ•‘
217
+ # โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
218
+
219
+ engine = HuggingFaceOCR()
220
+ pipeline = ExtractionPipeline(engine)
221
+
222
+
223
+ def on_extract(files: list[str] | None, progress: gr.Progress = gr.Progress()):
224
+ if not files:
225
+ return pd.DataFrame(columns=COLUMNS), None
226
+ if not engine.is_available:
227
+ raise gr.Error("Model failed to load โ€” check Space logs.")
228
+
229
+ df = pipeline.process_batch(
230
+ [Path(f) for f in files],
231
+ on_progress=lambda frac, msg: progress(frac, desc=msg),
232
+ )
233
+ csv_path = Path(tempfile.gettempdir()) / "ktp_results.csv"
234
+ df.to_csv(csv_path, index=False)
235
+ return df, str(csv_path)
236
+
237
+
238
+ def on_preview(files: list[str] | None):
239
+ return [Image.open(f) for f in files] if files else []
240
+
241
+
242
+ with gr.Blocks(theme=gr.themes.Soft(), title="KTP OCR Extractor") as demo:
243
+ gr.Markdown(
244
+ "# KTP OCR Extractor ๐Ÿ‡ฎ๐Ÿ‡ฉ\n"
245
+ "Upload KTP images โ†’ extract **NIK, Nama, Tempat Lahir, Tanggal Lahir** automatically."
246
+ )
247
+ with gr.Row():
248
+ with gr.Column(scale=1):
249
+ file_input = gr.File(
250
+ label="Upload KTP Images",
251
+ file_count="multiple",
252
+ file_types=["image"],
253
+ type="filepath",
254
+ )
255
+ gallery = gr.Gallery(label="Preview", columns=3, height=200)
256
+ extract_btn = gr.Button("Extract", variant="primary", size="lg")
257
+
258
+ with gr.Column(scale=2):
259
+ result_table = gr.DataFrame(label="Results", headers=COLUMNS)
260
+ csv_download = gr.File(label="Download CSV")
261
+
262
+ file_input.change(on_preview, file_input, gallery)
263
+ extract_btn.click(on_extract, file_input, [result_table, csv_download])
264
+
265
+ if __name__ == "__main__":
266
+ demo.launch()