Spaces:
Running
Running
| """PAN KYC screening API for a Hugging Face Docker Space. | |
| Run locally with: | |
| uvicorn main:app --host 0.0.0.0 --port 7860 | |
| This service performs preliminary image screening only; it does not prove | |
| that a PAN card is genuine, unedited, or physically present. | |
| """ | |
| import contextlib | |
| import hashlib | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import threading | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| from typing import Any | |
| # Must be set before Paddle/PaddleOCR is imported. | |
| os.environ.setdefault("FLAGS_use_mkldnn", "0") | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from huggingface_hub import hf_hub_download | |
| from paddleocr import PaddleOCR | |
| from PIL import Image, ImageOps, UnidentifiedImageError | |
| from ultralytics import YOLO | |
| ENGINE_LOGGER = logging.getLogger("pan_kyc") | |
| PAN_DETECTION_THRESHOLD = float(os.getenv("PAN_DETECTION_THRESHOLD", "0.80")) | |
| DEVICE_CONFIDENCE_THRESHOLD = float(os.getenv("DEVICE_CONFIDENCE_THRESHOLD", "0.35")) | |
| DEVICE_MIN_AREA_RATIO = float(os.getenv("DEVICE_MIN_AREA_RATIO", "0.12")) | |
| OCR_MIN_CONFIDENCE = float(os.getenv("OCR_MIN_CONFIDENCE", "0.30")) | |
| MAX_OCR_CORRECTIONS = int(os.getenv("MAX_OCR_CORRECTIONS", "2")) | |
| MAX_IMAGE_PIXELS = int(os.getenv("MAX_IMAGE_PIXELS", "25000000")) | |
| # Prevent extremely large decompression-bomb images from being silently accepted. | |
| Image.MAX_IMAGE_PIXELS = MAX_IMAGE_PIXELS | |
| PAN_ENTITY_MAP = { | |
| "P": "Person (Individual)", | |
| "C": "Company", | |
| "F": "Firm / Limited Liability Partnership (LLP)", | |
| "H": "Hindu Undivided Family (HUF)", | |
| "T": "Trust", | |
| "A": "Association of Persons (AOP)", | |
| "B": "Body of Individuals (BOI)", | |
| "G": "Government Agency", | |
| "L": "Local Authority", | |
| "J": "Artificial Juridical Person", | |
| } | |
| LETTER_FIX = { | |
| "0": "O", | |
| "1": "I", | |
| "2": "Z", | |
| "5": "S", | |
| "6": "G", | |
| "8": "B", | |
| } | |
| DIGIT_FIX = { | |
| "O": "0", | |
| "Q": "0", | |
| "D": "0", | |
| "I": "1", | |
| "L": "1", | |
| "Z": "2", | |
| "S": "5", | |
| "G": "6", | |
| "B": "8", | |
| } | |
| STRICT_PAN_REGEX = re.compile(r"^[A-Z]{5}[0-9]{4}[A-Z]$") | |
| PAN_MODEL_REPO = "foduucom/pan-card-detection" | |
| PAN_MODEL_FILENAME = "best.pt" | |
| PAN_MODEL_REVISION = "5b6395bcfda0814d8817dc6a446fd70533f88a24" | |
| PAN_MODEL_SHA256 = "a8721936f8585a53227445f997e1ebe10af5ba7faacd3602c01d65514c8dbbc8" | |
| # COCO class IDs used by yolov8n.pt. | |
| DEVICE_CLASSES = {62, 63, 67} # tv, laptop, cell phone | |
| class InvalidImageError(ValueError): | |
| """Raised when the upload is not a valid or acceptable image.""" | |
| def sha256_file(path: str | Path, chunk_size: int = 1024 * 1024) -> str: | |
| digest = hashlib.sha256() | |
| with open(path, "rb") as file: | |
| while chunk := file.read(chunk_size): | |
| digest.update(chunk) | |
| return digest.hexdigest() | |
| def allow_legacy_checkpoint_load(): | |
| """ | |
| The pinned PAN checkpoint is a legacy full-model PyTorch pickle. | |
| This context is used only after the exact file hash is verified. | |
| """ | |
| original_load = torch.load | |
| def patched_load(*args: Any, **kwargs: Any): | |
| kwargs["weights_only"] = False | |
| return original_load(*args, **kwargs) | |
| torch.load = patched_load | |
| try: | |
| yield | |
| finally: | |
| torch.load = original_load | |
| def download_verified_pan_checkpoint() -> str: | |
| path = hf_hub_download( | |
| repo_id=PAN_MODEL_REPO, | |
| filename=PAN_MODEL_FILENAME, | |
| revision=PAN_MODEL_REVISION, | |
| ) | |
| actual_hash = sha256_file(path) | |
| if actual_hash != PAN_MODEL_SHA256: | |
| raise RuntimeError( | |
| "PAN model hash verification failed. " | |
| f"Expected {PAN_MODEL_SHA256}, received {actual_hash}." | |
| ) | |
| return path | |
| def build_ocr_reader() -> PaddleOCR: | |
| return PaddleOCR( | |
| lang="en", | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False, | |
| engine="paddle", | |
| device="cpu", | |
| enable_mkldnn=False, | |
| cpu_threads=2, | |
| text_rec_score_thresh=OCR_MIN_CONFIDENCE, | |
| ) | |
| def decode_image(image_bytes: bytes) -> tuple[np.ndarray, int, int]: | |
| if not image_bytes: | |
| raise InvalidImageError("Uploaded file is empty.") | |
| try: | |
| with Image.open(io.BytesIO(image_bytes)) as image: | |
| image = ImageOps.exif_transpose(image) | |
| image.load() | |
| width, height = image.size | |
| if width < 64 or height < 64: | |
| raise InvalidImageError("Image is too small. Minimum dimension is 64 pixels.") | |
| if width * height > MAX_IMAGE_PIXELS: | |
| raise InvalidImageError( | |
| f"Image exceeds the {MAX_IMAGE_PIXELS:,}-pixel safety limit." | |
| ) | |
| image_rgb = image.convert("RGB") | |
| rgb_array = np.asarray(image_rgb) | |
| except (UnidentifiedImageError, OSError, ValueError) as error: | |
| if isinstance(error, InvalidImageError): | |
| raise | |
| raise InvalidImageError("The upload is not a readable JPG, JPEG, PNG, or WEBP image.") from error | |
| bgr_array = cv2.cvtColor(rgb_array, cv2.COLOR_RGB2BGR) | |
| return bgr_array, width, height | |
| def extract_ocr_tokens(ocr_reader: PaddleOCR, image_bgr: np.ndarray) -> list[str]: | |
| """Extract PaddleOCR 3.x text while tolerating minor result-shape differences.""" | |
| tokens: list[str] = [] | |
| results = ocr_reader.predict(image_bgr) | |
| for result in results: | |
| payload = getattr(result, "json", {}) | |
| if callable(payload): | |
| payload = payload() | |
| if isinstance(payload, str): | |
| payload = json.loads(payload) | |
| if not isinstance(payload, dict): | |
| continue | |
| data = payload.get("res", payload) | |
| if not isinstance(data, dict): | |
| continue | |
| texts = data.get("rec_texts", []) or [] | |
| scores = data.get("rec_scores", []) or [] | |
| if len(scores) != len(texts): | |
| scores = [1.0] * len(texts) | |
| for text, score in zip(texts, scores): | |
| cleaned = str(text).strip() | |
| if cleaned and float(score) >= OCR_MIN_CONFIDENCE: | |
| tokens.append(cleaned) | |
| return tokens | |
| def crop_with_padding( | |
| image_bgr: np.ndarray, | |
| xyxy: list[float], | |
| padding_ratio: float = 0.03, | |
| ) -> np.ndarray: | |
| height, width = image_bgr.shape[:2] | |
| x1, y1, x2, y2 = [float(value) for value in xyxy] | |
| pad_x = (x2 - x1) * padding_ratio | |
| pad_y = (y2 - y1) * padding_ratio | |
| x1 = max(0, int(x1 - pad_x)) | |
| y1 = max(0, int(y1 - pad_y)) | |
| x2 = min(width, int(x2 + pad_x)) | |
| y2 = min(height, int(y2 + pad_y)) | |
| crop = image_bgr[y1:y2, x1:x2] | |
| return crop if crop.size else image_bgr | |
| def upscale_for_ocr(image_bgr: np.ndarray, target_width: int = 1400) -> np.ndarray: | |
| height, width = image_bgr.shape[:2] | |
| if width <= 0 or height <= 0: | |
| return image_bgr | |
| scale = max(1.0, target_width / width) | |
| new_size = (int(width * scale), int(height * scale)) | |
| return cv2.resize(image_bgr, new_size, interpolation=cv2.INTER_CUBIC) | |
| def enhance_for_ocr(image_bgr: np.ndarray) -> np.ndarray: | |
| upscaled = upscale_for_ocr(image_bgr) | |
| lab = cv2.cvtColor(upscaled, cv2.COLOR_BGR2LAB) | |
| lightness, channel_a, channel_b = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| lightness = clahe.apply(lightness) | |
| enhanced = cv2.cvtColor( | |
| cv2.merge((lightness, channel_a, channel_b)), | |
| cv2.COLOR_LAB2BGR, | |
| ) | |
| blurred = cv2.GaussianBlur(enhanced, (0, 0), 1.0) | |
| return cv2.addWeighted(enhanced, 1.45, blurred, -0.45, 0) | |
| def build_ocr_variants( | |
| card_bgr: np.ndarray, | |
| full_image_bgr: np.ndarray, | |
| ) -> list[tuple[str, np.ndarray]]: | |
| variants: list[tuple[str, np.ndarray]] = [] | |
| card_upscaled = upscale_for_ocr(card_bgr) | |
| card_enhanced = enhance_for_ocr(card_bgr) | |
| variants.append(("card-upscaled", card_upscaled)) | |
| variants.append(("card-enhanced", card_enhanced)) | |
| height, width = card_enhanced.shape[:2] | |
| lower_region = card_enhanced[ | |
| int(height * 0.45):int(height * 0.90), | |
| 0:int(width * 0.82), | |
| ] | |
| if lower_region.size: | |
| variants.append(("card-lower-region", lower_region)) | |
| variants.append(("full-image-enhanced", enhance_for_ocr(full_image_bgr))) | |
| return variants | |
| def normalize_pan_candidate(raw_candidate: str) -> str | None: | |
| cleaned = re.sub(r"[^A-Z0-9]", "", raw_candidate.upper()) | |
| if len(cleaned) != 10: | |
| return None | |
| chars = list(cleaned) | |
| corrections = 0 | |
| letter_positions = {0, 1, 2, 3, 4, 9} | |
| digit_positions = {5, 6, 7, 8} | |
| for index in letter_positions: | |
| character = chars[index] | |
| if "A" <= character <= "Z": | |
| continue | |
| replacement = LETTER_FIX.get(character) | |
| if replacement is None: | |
| return None | |
| chars[index] = replacement | |
| corrections += 1 | |
| for index in digit_positions: | |
| character = chars[index] | |
| if character.isdigit(): | |
| continue | |
| replacement = DIGIT_FIX.get(character) | |
| if replacement is None: | |
| return None | |
| chars[index] = replacement | |
| corrections += 1 | |
| candidate = "".join(chars) | |
| if corrections > MAX_OCR_CORRECTIONS: | |
| return None | |
| if not STRICT_PAN_REGEX.fullmatch(candidate): | |
| return None | |
| if candidate[3] not in PAN_ENTITY_MAP: | |
| return None | |
| return candidate | |
| def windows_of_10(text: str): | |
| cleaned = re.sub(r"[^A-Z0-9]", "", text.upper()) | |
| if len(cleaned) < 10: | |
| return | |
| for index in range(len(cleaned) - 9): | |
| yield cleaned[index:index + 10] | |
| def find_pan_number(ocr_tokens: list[str]) -> str | None: | |
| sources = list(ocr_tokens) | |
| # Join only nearby OCR lines; never concatenate the whole document blindly. | |
| for group_size in (2, 3): | |
| for start in range(len(ocr_tokens) - group_size + 1): | |
| sources.append("".join(ocr_tokens[start:start + group_size])) | |
| seen: set[str] = set() | |
| for source in sources: | |
| for block in windows_of_10(source): | |
| if block in seen: | |
| continue | |
| seen.add(block) | |
| normalized = normalize_pan_candidate(block) | |
| if normalized: | |
| return normalized | |
| return None | |
| def mask_pan(pan: str) -> str: | |
| return f"{pan[:5]}****{pan[-1]}" | |
| class PanKycEngine: | |
| def __init__(self) -> None: | |
| self.device_detector: YOLO | None = None | |
| self.pan_detector: YOLO | None = None | |
| self.ocr_reader: PaddleOCR | None = None | |
| self.yolo_device: int | str = "cpu" | |
| self.loaded = False | |
| self._inference_lock = threading.Lock() | |
| def load_models(self) -> None: | |
| if self.loaded: | |
| return | |
| ENGINE_LOGGER.info("Loading PAN KYC models...") | |
| self.yolo_device = 0 if torch.cuda.is_available() else "cpu" | |
| self.device_detector = YOLO("yolov8n.pt") | |
| pan_model_path = download_verified_pan_checkpoint() | |
| with allow_legacy_checkpoint_load(): | |
| self.pan_detector = YOLO(pan_model_path) | |
| self.ocr_reader = build_ocr_reader() | |
| self.loaded = True | |
| ENGINE_LOGGER.info("Models loaded. YOLO device=%s", self.yolo_device) | |
| def _require_loaded(self) -> None: | |
| if not self.loaded: | |
| raise RuntimeError("Models are not loaded.") | |
| if self.device_detector is None or self.pan_detector is None or self.ocr_reader is None: | |
| raise RuntimeError("One or more models are unavailable.") | |
| def _run_device_gate(self, image_bgr: np.ndarray) -> dict[str, Any]: | |
| assert self.device_detector is not None | |
| image_height, image_width = image_bgr.shape[:2] | |
| image_area = max(1, image_height * image_width) | |
| results = self.device_detector.predict( | |
| image_bgr, | |
| verbose=False, | |
| device=self.yolo_device, | |
| ) | |
| boxes = results[0].boxes | |
| best_device: dict[str, Any] | None = None | |
| if boxes is not None: | |
| for class_tensor, confidence_tensor, coordinates_tensor in zip( | |
| boxes.cls, | |
| boxes.conf, | |
| boxes.xyxy, | |
| ): | |
| class_id = int(class_tensor.item()) | |
| if class_id not in DEVICE_CLASSES: | |
| continue | |
| confidence = float(confidence_tensor.item()) | |
| x1, y1, x2, y2 = coordinates_tensor.tolist() | |
| area_ratio = max(0.0, (x2 - x1) * (y2 - y1)) / image_area | |
| if ( | |
| confidence >= DEVICE_CONFIDENCE_THRESHOLD | |
| and area_ratio >= DEVICE_MIN_AREA_RATIO | |
| ): | |
| candidate = { | |
| "name": str(self.device_detector.names[class_id]), | |
| "class_id": class_id, | |
| "confidence": round(confidence, 4), | |
| "frame_area_ratio": round(area_ratio, 4), | |
| } | |
| if best_device is None or confidence > best_device["confidence"]: | |
| best_device = candidate | |
| return { | |
| "passed": best_device is None, | |
| "possible_device_presentation": best_device, | |
| "note": "Heuristic only; this does not prove or disprove a spoof attack.", | |
| } | |
| def _run_pan_visual_gate( | |
| self, | |
| image_bgr: np.ndarray, | |
| ) -> tuple[dict[str, Any], np.ndarray]: | |
| assert self.pan_detector is not None | |
| results = self.pan_detector.predict( | |
| image_bgr, | |
| verbose=False, | |
| device=self.yolo_device, | |
| ) | |
| boxes = results[0].boxes | |
| best_confidence = 0.0 | |
| detected_card = image_bgr | |
| if boxes is not None and len(boxes) > 0: | |
| best_index = int(torch.argmax(boxes.conf).item()) | |
| best_confidence = float(boxes.conf[best_index].item()) | |
| if best_confidence >= PAN_DETECTION_THRESHOLD: | |
| detected_card = crop_with_padding( | |
| image_bgr, | |
| boxes.xyxy[best_index].tolist(), | |
| ) | |
| passed = best_confidence >= PAN_DETECTION_THRESHOLD | |
| return ( | |
| { | |
| "passed": passed, | |
| "confidence": round(best_confidence, 4), | |
| "threshold": PAN_DETECTION_THRESHOLD, | |
| "note": "A detector match does not establish document authenticity.", | |
| }, | |
| detected_card, | |
| ) | |
| def _run_ocr_gate( | |
| self, | |
| card_bgr: np.ndarray, | |
| full_image_bgr: np.ndarray, | |
| debug: bool, | |
| ) -> tuple[dict[str, Any], list[str]]: | |
| assert self.ocr_reader is not None | |
| variants = build_ocr_variants(card_bgr, full_image_bgr) | |
| combined_tokens: list[str] = [] | |
| seen: set[str] = set() | |
| successful_runs = 0 | |
| failures: list[str] = [] | |
| variant_counts: dict[str, int] = {} | |
| for variant_name, variant_image in variants: | |
| try: | |
| variant_tokens = extract_ocr_tokens(self.ocr_reader, variant_image) | |
| successful_runs += 1 | |
| variant_counts[variant_name] = len(variant_tokens) | |
| except Exception as error: # Keep trying the remaining variants. | |
| ENGINE_LOGGER.exception("OCR failed for variant %s", variant_name) | |
| failures.append(f"{variant_name}: {type(error).__name__}: {error}") | |
| continue | |
| for token in variant_tokens: | |
| key = re.sub(r"\s+", " ", token.strip().upper()) | |
| if key and key not in seen: | |
| seen.add(key) | |
| combined_tokens.append(token) | |
| if find_pan_number(combined_tokens): | |
| break | |
| gate: dict[str, Any] = { | |
| "passed": successful_runs > 0 and bool(combined_tokens), | |
| "engine_ran_successfully": successful_runs > 0, | |
| "successful_variant_runs": successful_runs, | |
| "retained_line_count": len(combined_tokens), | |
| "variant_line_counts": variant_counts, | |
| } | |
| if debug: | |
| gate["ocr_tokens"] = combined_tokens | |
| gate["failures"] = failures | |
| elif failures: | |
| gate["failure_count"] = len(failures) | |
| return gate, combined_tokens | |
| def _base_response( | |
| request_id: str, | |
| filename: str, | |
| width: int, | |
| height: int, | |
| ) -> dict[str, Any]: | |
| return { | |
| "request_id": request_id, | |
| "filename": filename, | |
| "image": {"width": width, "height": height}, | |
| "decision": None, | |
| "status": None, | |
| "failed_gate": None, | |
| "reason": None, | |
| "result": None, | |
| "gates": {}, | |
| "disclaimer": ( | |
| "This endpoint performs preliminary image screening only. " | |
| "It does not prove that a PAN card is genuine, unedited, or physically present." | |
| ), | |
| } | |
| def analyze_bytes( | |
| self, | |
| image_bytes: bytes, | |
| filename: str, | |
| *, | |
| include_full_pan: bool = False, | |
| debug: bool = False, | |
| ) -> dict[str, Any]: | |
| self._require_loaded() | |
| started = time.perf_counter() | |
| request_id = uuid.uuid4().hex | |
| image_bgr, width, height = decode_image(image_bytes) | |
| response = self._base_response(request_id, filename, width, height) | |
| # PaddleOCR and model objects are kept behind one lock for predictable | |
| # behaviour on small CPU Spaces. Scale horizontally for real traffic. | |
| with self._inference_lock: | |
| gate1 = self._run_device_gate(image_bgr) | |
| response["gates"]["gate_1_device_risk"] = gate1 | |
| if not gate1["passed"]: | |
| response.update( | |
| decision="rejected", | |
| status="rejected_gate_1_device_risk", | |
| failed_gate=1, | |
| reason="A large phone, laptop, or TV was detected in the frame.", | |
| ) | |
| response["processing_ms"] = round((time.perf_counter() - started) * 1000, 2) | |
| return response | |
| gate2, card_bgr = self._run_pan_visual_gate(image_bgr) | |
| response["gates"]["gate_2_pan_visual"] = gate2 | |
| if not gate2["passed"]: | |
| response.update( | |
| decision="rejected", | |
| status="rejected_gate_2_pan_not_detected", | |
| failed_gate=2, | |
| reason="No PAN-card-like region reached the configured confidence threshold.", | |
| ) | |
| response["processing_ms"] = round((time.perf_counter() - started) * 1000, 2) | |
| return response | |
| gate3, ocr_tokens = self._run_ocr_gate(card_bgr, image_bgr, debug) | |
| response["gates"]["gate_3_ocr"] = gate3 | |
| if not gate3["engine_ran_successfully"]: | |
| response.update( | |
| decision="error", | |
| status="processing_error_gate_3_ocr", | |
| failed_gate=3, | |
| reason="The OCR engine failed before completing any OCR attempt.", | |
| ) | |
| response["processing_ms"] = round((time.perf_counter() - started) * 1000, 2) | |
| return response | |
| if not ocr_tokens: | |
| response.update( | |
| decision="rejected", | |
| status="rejected_gate_3_no_text", | |
| failed_gate=3, | |
| reason="OCR completed but returned no sufficiently confident text.", | |
| ) | |
| response["processing_ms"] = round((time.perf_counter() - started) * 1000, 2) | |
| return response | |
| detected_pan = find_pan_number(ocr_tokens) | |
| gate4 = { | |
| "passed": detected_pan is not None, | |
| "format": "AAAAA9999A", | |
| "max_ocr_corrections": MAX_OCR_CORRECTIONS, | |
| } | |
| response["gates"]["gate_4_pan_validation"] = gate4 | |
| if detected_pan is None: | |
| response.update( | |
| decision="rejected", | |
| status="rejected_gate_4_pan_not_found", | |
| failed_gate=4, | |
| reason="OCR text was found, but no valid PAN-format candidate was recovered.", | |
| ) | |
| response["processing_ms"] = round((time.perf_counter() - started) * 1000, 2) | |
| return response | |
| entity_code = detected_pan[3] | |
| response.update( | |
| decision="accepted", | |
| status="accepted_for_further_kyc_checks", | |
| failed_gate=None, | |
| reason="PAN format and entity character passed preliminary screening.", | |
| result={ | |
| "pan_number": detected_pan if include_full_pan else mask_pan(detected_pan), | |
| "pan_is_masked": not include_full_pan, | |
| "masked_pan": mask_pan(detected_pan), | |
| "entity_code": entity_code, | |
| "classification": PAN_ENTITY_MAP[entity_code], | |
| "routing": ( | |
| "PERSONAL_ROUTE" if entity_code == "P" else "BUSINESS_ENTITY_ROUTE" | |
| ), | |
| "authenticity_proven": False, | |
| }, | |
| ) | |
| response["processing_ms"] = round((time.perf_counter() - started) * 1000, 2) | |
| return response | |
| # ========================= FASTAPI APPLICATION ========================= | |
| import hmac | |
| import logging | |
| import os | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| from typing import Annotated | |
| from fastapi import Depends, FastAPI, File, Header, HTTPException, Query, Request, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from starlette.concurrency import run_in_threadpool | |
| logging.basicConfig( | |
| level=os.getenv("LOG_LEVEL", "INFO").upper(), | |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", | |
| ) | |
| API_LOGGER = logging.getLogger("pan_kyc_api") | |
| MAX_UPLOAD_MB = int(os.getenv("MAX_UPLOAD_MB", "10")) | |
| MAX_UPLOAD_BYTES = MAX_UPLOAD_MB * 1024 * 1024 | |
| API_KEY = os.getenv("API_KEY", "").strip() | |
| def get_allowed_origins() -> list[str]: | |
| raw = os.getenv("ALLOWED_ORIGINS", "*") | |
| origins = [origin.strip() for origin in raw.split(",") if origin.strip()] | |
| return origins or ["*"] | |
| async def lifespan(app: FastAPI): | |
| engine = PanKycEngine() | |
| await run_in_threadpool(engine.load_models) | |
| app.state.engine = engine | |
| yield | |
| app = FastAPI( | |
| title="PAN KYC Screening API", | |
| version="1.0.0", | |
| description=( | |
| "Preliminary PAN-image screening with a device-risk heuristic, " | |
| "PAN-region detection, PaddleOCR, PAN format validation, and entity routing." | |
| ), | |
| lifespan=lifespan, | |
| ) | |
| origins = get_allowed_origins() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=origins != ["*"], | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| def require_api_key( | |
| x_api_key: Annotated[str | None, Header(alias="X-API-Key")] = None, | |
| ) -> None: | |
| """Require X-API-Key only when the API_KEY Space secret is configured.""" | |
| if not API_KEY: | |
| return | |
| if x_api_key is None or not hmac.compare_digest(x_api_key, API_KEY): | |
| raise HTTPException(status_code=401, detail="Missing or invalid X-API-Key header.") | |
| def root() -> dict: | |
| return { | |
| "service": "PAN KYC Screening API", | |
| "status": "running", | |
| "docs": "/docs", | |
| "health": "/health", | |
| "endpoint": "POST /analyze-pan", | |
| } | |
| def health(request: Request) -> dict: | |
| engine: PanKycEngine | None = getattr(request.app.state, "engine", None) | |
| return { | |
| "status": "ok" if engine and engine.loaded else "starting", | |
| "models_loaded": bool(engine and engine.loaded), | |
| "yolo_device": engine.yolo_device if engine else None, | |
| } | |
| async def analyze_pan( | |
| request: Request, | |
| file: Annotated[UploadFile, File(description="PAN image: JPG, JPEG, PNG, or WEBP")], | |
| include_full_pan: Annotated[ | |
| bool, | |
| Query(description="Return the full detected PAN instead of a masked PAN."), | |
| ] = False, | |
| debug: Annotated[ | |
| bool, | |
| Query(description="Include OCR tokens and variant failures. Use only for testing."), | |
| ] = False, | |
| ): | |
| content_type = (file.content_type or "").lower() | |
| if content_type and not ( | |
| content_type.startswith("image/") or content_type == "application/octet-stream" | |
| ): | |
| raise HTTPException(status_code=415, detail="Upload must be an image file.") | |
| image_bytes = await file.read(MAX_UPLOAD_BYTES + 1) | |
| await file.close() | |
| if len(image_bytes) > MAX_UPLOAD_BYTES: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"Image exceeds the {MAX_UPLOAD_MB} MB upload limit.", | |
| ) | |
| safe_filename = Path(file.filename or "uploaded-image").name | |
| engine: PanKycEngine = request.app.state.engine | |
| try: | |
| result = await run_in_threadpool( | |
| engine.analyze_bytes, | |
| image_bytes, | |
| safe_filename, | |
| include_full_pan=include_full_pan, | |
| debug=debug, | |
| ) | |
| except InvalidImageError as error: | |
| raise HTTPException(status_code=422, detail=str(error)) from error | |
| except Exception as error: | |
| API_LOGGER.exception("Unexpected PAN analysis failure") | |
| raise HTTPException( | |
| status_code=503, | |
| detail=f"PAN analysis service failed: {type(error).__name__}", | |
| ) from error | |
| # Return the detailed internal report only when debug=true. | |
| if debug: | |
| status_code = 503 if result.get("decision") == "error" else 200 | |
| return JSONResponse(status_code=status_code, content=result) | |
| status = result.get("status") | |
| request_id = result.get("request_id") | |
| response_map = { | |
| "rejected_gate_1_device_risk": ( | |
| "DEVICE_PRESENTATION_DETECTED", | |
| "A phone, laptop, or TV was detected in the uploaded image.", | |
| ), | |
| "rejected_gate_2_pan_not_detected": ( | |
| "PAN_CARD_NOT_DETECTED", | |
| "Uploaded image was not recognized as a PAN card.", | |
| ), | |
| "rejected_gate_3_no_text": ( | |
| "PAN_TEXT_NOT_READABLE", | |
| "PAN card text could not be read clearly. Upload a clearer image.", | |
| ), | |
| "rejected_gate_4_pan_not_found": ( | |
| "PAN_NUMBER_NOT_FOUND", | |
| "A PAN-like card was detected, but a valid PAN number was not found.", | |
| ), | |
| "processing_error_gate_3_ocr": ( | |
| "OCR_PROCESSING_ERROR", | |
| "The OCR service could not process the image. Please try again.", | |
| ), | |
| } | |
| if result.get("decision") == "accepted": | |
| pan_result = result.get("result") or {} | |
| compact_response = { | |
| "request_id": request_id, | |
| "success": True, | |
| "valid_pan": True, | |
| "status": "accepted", | |
| "code": "VALID_PAN", | |
| "message": "PAN card detected and PAN number validated.", | |
| "data": { | |
| "pan_number": pan_result.get("pan_number"), | |
| "is_masked": pan_result.get("pan_is_masked", True), | |
| "masked_pan": pan_result.get("masked_pan"), | |
| "entity_code": pan_result.get("entity_code"), | |
| "entity_type": pan_result.get("classification"), | |
| "kyc_route": pan_result.get("routing"), | |
| }, | |
| } | |
| return JSONResponse(status_code=200, content=compact_response) | |
| code, message = response_map.get( | |
| status, | |
| ("PAN_VALIDATION_FAILED", result.get("reason") or "PAN validation failed."), | |
| ) | |
| is_processing_error = result.get("decision") == "error" | |
| compact_response = { | |
| "request_id": request_id, | |
| "success": not is_processing_error, | |
| "valid_pan": False, | |
| "status": "error" if is_processing_error else "rejected", | |
| "code": code, | |
| "message": message, | |
| "data": None, | |
| } | |
| return JSONResponse( | |
| status_code=503 if is_processing_error else 200, | |
| content=compact_response, | |
| ) |