Spaces:
Sleeping
Sleeping
| # Patch bug gradio_client: schema berupa bool β crash saat generate API | |
| import gradio_client.utils as _gcu | |
| _orig_schema = _gcu._json_schema_to_python_type | |
| def _safe_schema(schema, defs=None): | |
| if not isinstance(schema, dict): | |
| return "any" | |
| return _orig_schema(schema, defs) | |
| _gcu._json_schema_to_python_type = _safe_schema | |
| # --------------------------------------------------------------------------- | |
| import re | |
| import json | |
| import numpy as np | |
| import cv2 | |
| import gradio as gr | |
| import spaces | |
| from PIL import Image | |
| from huggingface_hub import hf_hub_download | |
| # --------------------------------------------------------------------------- | |
| # Konstanta (inline dari config.py β Space tidak bisa import modul lokal) | |
| # --------------------------------------------------------------------------- | |
| HF_REPO_ID = "NeoCode77/notepay-models" | |
| YOLO_CLASSES = ["line_item", "nama_toko", "tanggal_waktu", "total_belanja"] | |
| CHARACTERS = list("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz .,/-()%") | |
| IDX_TO_CHAR = {i + 1: ch for i, ch in enumerate(CHARACTERS)} | |
| IDX_TO_CHAR[0] = "" # blank CTC | |
| CROP_HEIGHT = 32 | |
| CROP_WIDTH = 512 | |
| EXPENSE_CATEGORIES = [ | |
| "Makanan & Minuman", | |
| "Kebersihan & Perawatan", | |
| "Rumah Tangga", | |
| "Kesehatan & Farmasi", | |
| "Elektronik & Pulsa", | |
| "Pakaian & Aksesori", | |
| "Lain-lain", | |
| ] | |
| CLASS_COLORS = { | |
| "nama_toko": (255, 100, 50), | |
| "line_item": ( 50, 200, 50), | |
| "tanggal_waktu": ( 50, 150, 255), | |
| "total_belanja": ( 0, 50, 255), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Load model (sekali saat startup) | |
| # --------------------------------------------------------------------------- | |
| print("Mendownload model YOLO...") | |
| _yolo_path = hf_hub_download(repo_id=HF_REPO_ID, filename="yolo/best.pt") | |
| print("Mendownload model CRNN...") | |
| _crnn_path = hf_hub_download(repo_id=HF_REPO_ID, filename="crnn/inference_model.keras") | |
| print("Mendownload model Classifier...") | |
| _clf_path = hf_hub_download(repo_id=HF_REPO_ID, filename="classifier/classifier_model.keras") | |
| # TF config: memory growth agar tidak OOM di CPU | |
| import tensorflow as tf | |
| import keras | |
| tf.get_logger().setLevel("ERROR") | |
| print("Loading YOLO...") | |
| from ultralytics import YOLO as _YOLO | |
| yolo_model = _YOLO(_yolo_path) | |
| print("Loading CRNN...") | |
| # Patch Keras 3.x: Lambda layer compute_output_shape kadang raise NotImplementedError | |
| try: | |
| from keras.src.layers.core.lambda_layer import Lambda as _KLambda | |
| _orig_cos = _KLambda.compute_output_shape | |
| def _patched_cos(self, input_shape): | |
| try: | |
| return _orig_cos(self, input_shape) | |
| except NotImplementedError: | |
| return input_shape | |
| _KLambda.compute_output_shape = _patched_cos | |
| except ImportError: | |
| pass | |
| crnn_model = keras.models.load_model(_crnn_path, compile=False, safe_mode=False) | |
| print("Loading Classifier...") | |
| clf_model = keras.models.load_model(_clf_path, compile=False) | |
| print("Semua model siap!") | |
| # --------------------------------------------------------------------------- | |
| # Fungsi image processing (inline dari inference.py) | |
| # --------------------------------------------------------------------------- | |
| def _order_quad(pts): | |
| pts = pts.reshape(4, 2) | |
| rect = np.zeros((4, 2), dtype=np.float32) | |
| s = pts.sum(axis=1) | |
| rect[0] = pts[np.argmin(s)] | |
| rect[2] = pts[np.argmax(s)] | |
| diff = np.diff(pts, axis=1) | |
| rect[1] = pts[np.argmin(diff)] | |
| rect[3] = pts[np.argmax(diff)] | |
| return rect | |
| def deskew_crop(image, quad, out_h=CROP_HEIGHT, out_w=CROP_WIDTH): | |
| src = _order_quad(quad) | |
| dst = np.array([[0,0],[out_w,0],[out_w,out_h],[0,out_h]], dtype=np.float32) | |
| M = cv2.getPerspectiveTransform(src, dst) | |
| return cv2.warpPerspective(image, M, (out_w, out_h)) | |
| def preprocess_crop(crop): | |
| gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if crop.ndim == 3 else crop | |
| _, mask = cv2.threshold(gray, 10, 255, cv2.THRESH_BINARY) | |
| coords = cv2.findNonZero(mask) | |
| if coords is not None: | |
| x, y, w, h = cv2.boundingRect(coords) | |
| gray = gray[y:y+h, x:x+w] | |
| binary = cv2.adaptiveThreshold( | |
| gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 10 | |
| ) | |
| h, w = binary.shape | |
| new_w = max(1, int(w * CROP_HEIGHT / h)) | |
| resized = cv2.resize(binary, (new_w, CROP_HEIGHT), interpolation=cv2.INTER_AREA) | |
| if new_w >= CROP_WIDTH: | |
| out = cv2.resize(binary, (CROP_WIDTH, CROP_HEIGHT), interpolation=cv2.INTER_AREA) | |
| else: | |
| pad = np.full((CROP_HEIGHT, CROP_WIDTH - new_w), 255, dtype=np.uint8) | |
| out = np.hstack([resized, pad]) | |
| return out.astype(np.float32)[np.newaxis, :, :, np.newaxis] / 255.0 | |
| def ctc_decode(logits): | |
| indices = np.argmax(logits, axis=-1) | |
| prev, chars = -1, [] | |
| for idx in indices: | |
| if idx != prev: | |
| if idx != 0: | |
| chars.append(IDX_TO_CHAR.get(int(idx), "")) | |
| prev = idx | |
| return "".join(chars) | |
| def parse_amount(text): | |
| if not text: | |
| return None | |
| cleaned = re.sub(r"[^\d.,]", "", text) | |
| if not cleaned: | |
| return None | |
| if re.search(r",\d{1,2}$", cleaned): | |
| cleaned = cleaned.replace(".", "").replace(",", ".") | |
| else: | |
| cleaned = cleaned.replace(".", "").replace(",", "") | |
| try: | |
| return float(cleaned) | |
| except ValueError: | |
| return None | |
| def parse_datetime(text): | |
| if not text: | |
| return None | |
| _BULAN = {"JAN":1,"FEB":2,"MAR":3,"APR":4,"MEI":5,"JUN":6, | |
| "JUL":7,"AGU":8,"SEP":9,"OKT":10,"NOV":11,"DES":12} | |
| patterns = [ | |
| (r"(\d{2})[/\-.](\d{2})[/\-.](\d{4})\s+(\d{2}:\d{2})(?::\d{2})?", | |
| lambda m: f"{m[2]}-{m[1]}-{m[0]} {m[3]}"), | |
| (r"(\d{4})[/\-.](\d{2})[/\-.](\d{2})\s+(\d{2}:\d{2})", | |
| lambda m: f"{m[0]}-{m[1]}-{m[2]} {m[3]}"), | |
| (r"(\d{2})[/\-.](\d{2})[/\-.](\d{4})", | |
| lambda m: f"{m[2]}-{m[1]}-{m[0]}"), | |
| ] | |
| upper = text.upper() | |
| for pattern, fmt in patterns: | |
| m = re.search(pattern, upper) | |
| if m: | |
| try: | |
| return fmt(m.groups()) | |
| except Exception: | |
| continue | |
| return None | |
| def classify_items(items): | |
| if not items: | |
| return [] | |
| arr = tf.constant([[item] for item in items]) | |
| preds = clf_model(arr, training=False).numpy() | |
| return [ | |
| { | |
| "text" : text, | |
| "category" : EXPENSE_CATEGORIES[int(np.argmax(pred))], | |
| "confidence": round(float(np.max(pred)), 3), | |
| } | |
| for text, pred in zip(items, preds) | |
| ] | |
| def draw_results(image, detections): | |
| annotated = image.copy() | |
| for det in detections: | |
| color = CLASS_COLORS.get(det["class"], (200, 200, 200)) | |
| pts = det["quad"].astype(np.int32).reshape((-1, 1, 2)) | |
| cv2.polylines(annotated, [pts], isClosed=True, color=color, thickness=2) | |
| tx, ty = int(det["quad"][0][0]), int(det["quad"][0][1]) - 8 | |
| cv2.putText(annotated, f"{det['class']} {det['conf']:.0%}", | |
| (tx, ty), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA) | |
| if det.get("text"): | |
| cv2.putText(annotated, f"\"{det['text'][:30]}\"", | |
| (tx, ty - 16), cv2.FONT_HERSHEY_SIMPLEX, 0.38, | |
| (255, 255, 255), 1, cv2.LINE_AA) | |
| return annotated | |
| # --------------------------------------------------------------------------- | |
| # Pipeline dengan log streaming (untuk UI) | |
| # --------------------------------------------------------------------------- | |
| import time as _time | |
| def _build_result(output, classified): | |
| total_raw = output.get("total_belanja", [""])[0] | |
| tgl_raw = output.get("tanggal_waktu", [""])[0] | |
| total_num = parse_amount(total_raw) | |
| summary = {} | |
| for it in classified: | |
| summary[it["category"]] = summary.get(it["category"], 0) + 1 | |
| return { | |
| "nama_toko" : output.get("nama_toko", [""])[0], | |
| "tanggal_waktu" : tgl_raw, | |
| "tanggal_parsed" : parse_datetime(tgl_raw), | |
| "total_belanja" : total_raw, | |
| "total_parsed" : total_num, | |
| "line_item" : output.get("line_item", []), | |
| "line_item_classified": classified, | |
| "kategori_summary" : dict(sorted(summary.items(), key=lambda x: -x[1])), | |
| } | |
| def predict(image_pil, confidence): | |
| """UI function β yield log lines satu per satu untuk efek streaming.""" | |
| if image_pil is None: | |
| yield "β οΈ Tidak ada gambar.", None, "{}" | |
| return | |
| t0 = _time.time() | |
| log = [] | |
| def emit(line): | |
| log.append(line) | |
| return "\n".join(log) | |
| image = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) | |
| # ββ Stage 1: YOLO βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| yield emit("[ Stage 1 / 3 ] YOLOv8-OBB β deteksi region struk ..."), None, "{}" | |
| results = yolo_model(image, conf=confidence, verbose=False)[0] | |
| obb = results.obb | |
| if obb is None or len(obb) == 0: | |
| yield emit("β οΈ Tidak ada region terdeteksi. Coba turunkan confidence."), None, "{}" | |
| return | |
| quads = obb.xyxyxyxy.cpu().numpy().reshape(-1, 4, 2) | |
| class_ids = obb.cls.cpu().numpy().astype(int) | |
| confs = obb.conf.cpu().numpy() | |
| yield emit(f" β {len(quads)} region terdeteksi: {[YOLO_CLASSES[c] for c in class_ids]}"), None, "{}" | |
| # ββ Stage 2: CRNN βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| yield emit("\n[ Stage 2 / 3 ] CRNN+CTC β baca teks per region ..."), None, "{}" | |
| output, detections = {}, [] | |
| for i, (quad, cls_id, conf) in enumerate(zip(quads, class_ids, confs)): | |
| cls_name = YOLO_CLASSES[cls_id] if cls_id < len(YOLO_CLASSES) else f"class_{cls_id}" | |
| crop = deskew_crop(image, quad) | |
| tensor = preprocess_crop(crop) | |
| logits = crnn_model(tensor, training=False).numpy()[0] | |
| text = ctc_decode(logits) | |
| output.setdefault(cls_name, []).append(text) | |
| detections.append({"class": cls_name, "conf": float(conf), "text": text, "quad": quad}) | |
| yield emit(f" [{i+1}/{len(quads)}] {cls_name:<16} \"{text[:40]}\""), None, "{}" | |
| # ββ Stage 3: Classifier βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| raw_items = output.get("line_item", []) | |
| yield emit(f"\n[ Stage 3 / 3 ] Classifier β kategorikan {len(raw_items)} item ..."), None, "{}" | |
| classified = classify_items(raw_items) | |
| for it in classified: | |
| yield emit(f" β’ {it['text'][:30]:<32} β {it['category']} ({it['confidence']*100:.0f}%)"), None, "{}" | |
| # ββ Hasil akhir βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| result = _build_result(output, classified) | |
| annotated = draw_results(image, detections) | |
| ann_pil = Image.fromarray(cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)) | |
| elapsed = _time.time() - t0 | |
| total_fmt = f"Rp {result['total_parsed']:,.0f}".replace(",", ".") if result["total_parsed"] else result["total_belanja"] | |
| summary = result["kategori_summary"] | |
| log.append(f"\n{'β'*55}") | |
| log.append(f" Nama Toko : {result['nama_toko'] or '-'}") | |
| log.append(f" Tanggal/Waktu : {result['tanggal_parsed'] or result['tanggal_waktu'] or '-'}") | |
| log.append(f" Total Belanja : {total_fmt or '-'}") | |
| if summary: | |
| log.append(" Kategori : " + " | ".join(f"{k} ({v})" for k, v in summary.items())) | |
| log.append(f"{'β'*55}") | |
| log.append(f" Selesai dalam {elapsed:.1f} detik") | |
| yield "\n".join(log), ann_pil, json.dumps(result, ensure_ascii=False, indent=2) | |
| # ββ API endpoints (non-streaming, untuk Next.js) ββββββββββββββββββββββββββββ | |
| def api_predict(image_pil: Image.Image, confidence: float = 0.25) -> str: | |
| if image_pil is None: | |
| return json.dumps({"error": "Tidak ada gambar."}) | |
| image = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) | |
| results = yolo_model(image, conf=confidence, verbose=False)[0] | |
| obb = results.obb | |
| if obb is None or len(obb) == 0: | |
| return json.dumps({}) | |
| quads = obb.xyxyxyxy.cpu().numpy().reshape(-1, 4, 2) | |
| class_ids = obb.cls.cpu().numpy().astype(int) | |
| confs = obb.conf.cpu().numpy() | |
| output, detections = {}, [] | |
| for quad, cls_id, conf in zip(quads, class_ids, confs): | |
| cls_name = YOLO_CLASSES[cls_id] if cls_id < len(YOLO_CLASSES) else f"class_{cls_id}" | |
| crop = deskew_crop(image, quad) | |
| tensor = preprocess_crop(crop) | |
| logits = crnn_model(tensor, training=False).numpy()[0] | |
| text = ctc_decode(logits) | |
| output.setdefault(cls_name, []).append(text) | |
| detections.append({"class": cls_name, "conf": float(conf), "text": text, "quad": quad}) | |
| classified = classify_items(output.get("line_item", [])) | |
| result = _build_result(output, classified) | |
| return json.dumps(result, ensure_ascii=False) | |
| def api_classify(items_json: str) -> str: | |
| try: | |
| items = json.loads(items_json) | |
| if not isinstance(items, list): | |
| return json.dumps({"error": "'items' harus JSON array."}) | |
| return json.dumps({"results": classify_items(items)}, ensure_ascii=False) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}) | |
| # --------------------------------------------------------------------------- | |
| # Gradio UI β Log Style | |
| # --------------------------------------------------------------------------- | |
| with gr.Blocks(title="NotePay OCR", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π§Ύ NotePay β Pipeline Log\n`YOLOv8-OBB` β `CRNN+CTC` β `Classifier`") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| inp_image = gr.Image(type="pil", label="Upload Foto Struk") | |
| inp_conf = gr.Slider(0.1, 0.9, value=0.25, step=0.05, label="Confidence YOLO") | |
| btn = gr.Button("βΆ Jalankan Pipeline", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| out_log = gr.Textbox( | |
| label="Pipeline Log", | |
| lines=35, | |
| max_lines=60, | |
| show_copy_button=True, | |
| placeholder="Log pipeline akan muncul di sini...", | |
| ) | |
| # out_image dan out_json tetap ada tapi hidden β dibutuhkan oleh predict() | |
| out_image = gr.Image(type="pil", visible=False) | |
| out_json = gr.Code(language="json", visible=False) | |
| btn.click(fn=predict, inputs=[inp_image, inp_conf], | |
| outputs=[out_log, out_image, out_json], | |
| api_name="predict") | |
| gr.Markdown("---\n**Model:** [`NeoCode77/notepay-models`](https://huggingface.co/NeoCode77/notepay-models) Β· Coding Camp 2026 β DBS Foundation") | |
| # Hidden API endpoints | |
| with gr.Row(visible=False): | |
| _api_img = gr.Image(type="pil") | |
| _api_conf = gr.Number(value=0.25) | |
| _api_out = gr.Text() | |
| _api_items = gr.Text() | |
| _api_clf_out = gr.Text() | |
| _api_img.change(fn=api_predict, inputs=[_api_img, _api_conf], | |
| outputs=_api_out, api_name="api_predict") | |
| _api_items.change(fn=api_classify, inputs=[_api_items], | |
| outputs=_api_clf_out, api_name="api_classify") | |
| demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |