Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import cv2 | |
| import numpy as np | |
| from pyzbar import pyzbar | |
| from PIL import Image | |
| import fitz # PyMuPDF | |
| import json | |
| import base64 | |
| import logging | |
| import re | |
| import zlib | |
| import asyncio | |
| import warnings | |
| import os | |
| import hashlib | |
| import time | |
| from typing import Optional, Dict, Any, List, Tuple | |
| from datetime import datetime | |
| from concurrent.futures import ThreadPoolExecutor | |
| import uuid | |
| # ============ Suppress Warnings ============ | |
| warnings.filterwarnings('ignore') | |
| os.environ['ZBAR_VERBOSE'] = '0' | |
| # Suppress stderr on Windows | |
| import sys | |
| if sys.platform == 'win32': | |
| import ctypes | |
| try: | |
| kernel32 = ctypes.WinDLL('kernel32') | |
| except: | |
| pass | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - [%(levelname)s] - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="E-Invoice QR Extractor API", version="2.3.0") | |
| # ============ Configuration ============ | |
| ALLOW_PAYMENT_FALLBACK = False | |
| THUMBNAIL_DPI = 220 | |
| CROP_MARGIN = 0.15 | |
| MAX_CONCURRENT_REQUESTS = 5 # Limit concurrent processing | |
| MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB | |
| PROCESSING_TIMEOUT = 300 # 5 minutes | |
| # ============ Concurrency Control ============ | |
| processing_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) | |
| executor = ThreadPoolExecutor(max_workers=MAX_CONCURRENT_REQUESTS) | |
| # ============ In-Memory Tracking (simple) ============ | |
| request_tracking: Dict[str, Dict[str, Any]] = {} | |
| # ============ CORS ============ | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============ Helper Functions ============ | |
| def get_file_hash(content: bytes) -> str: | |
| """Generate MD5 hash for file deduplication.""" | |
| return hashlib.md5(content).hexdigest() | |
| def track_request(request_id: str, data: Dict[str, Any]): | |
| """Track request in memory.""" | |
| request_tracking[request_id] = { | |
| **data, | |
| "created_at": datetime.now().isoformat(), | |
| "status": "processing" | |
| } | |
| def update_request(request_id: str, status: str, result: Optional[Dict] = None): | |
| """Update request status.""" | |
| if request_id in request_tracking: | |
| request_tracking[request_id]["status"] = status | |
| request_tracking[request_id]["updated_at"] = datetime.now().isoformat() | |
| if result: | |
| request_tracking[request_id]["result"] = result | |
| def cleanup_old_tracking(): | |
| """Remove tracking older than 1 hour.""" | |
| cutoff = time.time() - 3600 | |
| to_remove = [] | |
| for req_id, data in request_tracking.items(): | |
| created = datetime.fromisoformat(data["created_at"]).timestamp() | |
| if created < cutoff: | |
| to_remove.append(req_id) | |
| for req_id in to_remove: | |
| del request_tracking[req_id] | |
| # ============ Decoding Helpers ============ | |
| def decode_gst_qr(raw_data: str) -> Dict[str, Any]: | |
| """Decode GST QR (JSON / JWT-like base64url / Base64 / Base64+zlib).""" | |
| s = raw_data or "" | |
| # JSON | |
| try: | |
| return json.loads(s) | |
| except Exception: | |
| pass | |
| # JWT-like base64url payload | |
| try: | |
| if "." in s: | |
| parts = s.split(".") | |
| if len(parts) >= 2: | |
| payload = parts[1] | |
| payload += "=" * ((4 - len(payload) % 4) % 4) | |
| b = base64.urlsafe_b64decode(payload) | |
| try: | |
| return json.loads(b.decode("utf-8")) | |
| except Exception: | |
| try: | |
| return json.loads(zlib.decompress(b).decode("utf-8")) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # Plain Base64 | |
| try: | |
| padded = s + "=" * ((4 - len(s) % 4) % 4) | |
| b = base64.b64decode(padded) | |
| try: | |
| return json.loads(b.decode("utf-8")) | |
| except Exception: | |
| try: | |
| return json.loads(zlib.decompress(b).decode("utf-8")) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| return {"raw": s} | |
| # ============ Classification ============ | |
| GSTIN_REGEX = re.compile(r"\b\d{2}[A-Z]{5}\d{4}[A-Z]\dZ[A-Z0-9]\b") | |
| def classify_qr(raw_text: str, decoded_obj: Optional[Dict[str, Any]] = None) -> str: | |
| """Return 'einvoice' | 'payment' | 'unknown'""" | |
| text = (raw_text or "").strip() | |
| lower = text.lower() | |
| # Payment patterns | |
| if lower.startswith("upi://") or "upi://pay" in lower: | |
| return "payment" | |
| if text.startswith("000201"): | |
| return "payment" | |
| if any(k in lower for k in ["paytm", "phonepe", "gpay", "googlepay", "bharatqr", "bhim upi", "upi://collect"]): | |
| return "payment" | |
| # E-Invoice patterns | |
| parsed = decoded_obj or decode_gst_qr(text) | |
| if isinstance(parsed, dict): | |
| keys = {k.lower() for k in parsed.keys()} | |
| e_keys = {"irn", "sellergstin", "buyergstin", "docno", "docdt", "totinvval", "mainhsncode", "signedqrcode"} | |
| if keys & e_keys: | |
| return "einvoice" | |
| if GSTIN_REGEX.search(text): | |
| return "einvoice" | |
| if len(text) >= 200 and not (lower.startswith("upi://") or text.startswith("000201")): | |
| return "einvoice" | |
| return "unknown" | |
| def pick_einvoice_first(payloads: List[str]) -> Optional[Dict[str, Any]]: | |
| """Prefer E-Invoice QR over others.""" | |
| first_any = None | |
| for raw in payloads: | |
| dec = decode_gst_qr(raw) | |
| cls = classify_qr(raw, dec) | |
| if first_any is None: | |
| first_any = (dec, cls) | |
| if cls == "einvoice": | |
| dec["_classification"] = "einvoice" | |
| return dec | |
| if ALLOW_PAYMENT_FALLBACK and first_any: | |
| dec, cls = first_any | |
| dec["_classification"] = cls | |
| dec["_note"] = "No E-Invoice QR; returning first available QR" | |
| return dec | |
| return None | |
| # ============ Image Processing ============ | |
| def enhance_image_for_qr(image: np.ndarray) -> np.ndarray: | |
| """Enhance image for better QR detection.""" | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| enhanced = clahe.apply(gray) | |
| denoised = cv2.fastNlMeansDenoising(enhanced, None, 10, 7, 21) | |
| kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) | |
| sharpened = cv2.filter2D(denoised, -1, kernel) | |
| binary = cv2.adaptiveThreshold(sharpened, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) | |
| return binary | |
| # ============ QR Decoders ============ | |
| def detect_all_qr_opencv(image: np.ndarray) -> List[str]: | |
| """Detect multiple QR codes using OpenCV.""" | |
| res: List[str] = [] | |
| det = cv2.QRCodeDetector() | |
| try: | |
| data_list, _, _ = det.detectAndDecodeMulti(image) | |
| if isinstance(data_list, (list, tuple)): | |
| res.extend([d for d in data_list if d]) | |
| except Exception: | |
| pass | |
| try: | |
| d, _, _ = det.detectAndDecode(image) | |
| if d: | |
| res.append(d) | |
| except Exception: | |
| pass | |
| return res | |
| def detect_all_qr_pyzbar(image: np.ndarray) -> List[str]: | |
| """Detect multiple QR codes using pyzbar (with warning suppression).""" | |
| res: List[str] = [] | |
| try: | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore") | |
| decoded_objects = pyzbar.decode(image) | |
| for obj in decoded_objects: | |
| if obj.type == 'QRCODE': | |
| try: | |
| s = obj.data.decode('utf-8', errors='ignore') | |
| if s: | |
| res.append(s) | |
| except Exception: | |
| continue | |
| except Exception: | |
| pass | |
| return res | |
| def dedupe_keep_order(items: List[str]) -> List[str]: | |
| seen = set() | |
| out: List[str] = [] | |
| for it in items: | |
| if it not in seen: | |
| seen.add(it) | |
| out.append(it) | |
| return out | |
| # ============ Image QR Extraction ============ | |
| def extract_qr_from_image(image_bytes: bytes) -> Optional[Dict[str, Any]]: | |
| """Decode ALL QRs and return E-Invoice preferred.""" | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if img is None: | |
| return None | |
| all_payloads: List[str] = [] | |
| all_payloads += detect_all_qr_opencv(img) | |
| all_payloads += detect_all_qr_pyzbar(img) | |
| if not all_payloads: | |
| enh = enhance_image_for_qr(img) | |
| enh_bgr = cv2.cvtColor(enh, cv2.COLOR_GRAY2BGR) | |
| all_payloads += detect_all_qr_opencv(enh_bgr) | |
| all_payloads += detect_all_qr_pyzbar(enh) | |
| all_payloads = dedupe_keep_order(all_payloads) | |
| if not all_payloads: | |
| h, w = img.shape[:2] | |
| regions = [ | |
| img[0:int(h*0.45), int(w*0.55):w], | |
| img[:, int(w*0.66):w], | |
| ] | |
| for roi in regions: | |
| if roi.size == 0: | |
| continue | |
| p = detect_all_qr_opencv(roi) + detect_all_qr_pyzbar(roi) | |
| all_payloads += p | |
| if p: | |
| break | |
| all_payloads = dedupe_keep_order(all_payloads) | |
| if not all_payloads: | |
| return None | |
| return pick_einvoice_first(all_payloads) | |
| # ============ PDF Processing ============ | |
| def detect_boxes_on_thumbnail(page: fitz.Page, thumb_dpi: int = THUMBNAIL_DPI) -> List[fitz.Rect]: | |
| """Detect QR boxes on thumbnail.""" | |
| zoom = thumb_dpi / 72.0 | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
| if pix.n == 4: | |
| img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) | |
| boxes: List[fitz.Rect] = [] | |
| det = cv2.QRCodeDetector() | |
| try: | |
| ok, pts = det.detect(img) | |
| if ok and pts is not None: | |
| for poly in np.array(pts): | |
| xs, ys = poly[:, 0], poly[:, 1] | |
| x1, y1, x2, y2 = float(xs.min()), float(ys.min()), float(xs.max()), float(ys.max()) | |
| w, h = x2 - x1, y2 - y1 | |
| x1 -= CROP_MARGIN * w | |
| y1 -= CROP_MARGIN * h | |
| x2 += CROP_MARGIN * w | |
| y2 += CROP_MARGIN * h | |
| rect_pdf = fitz.Rect(x1/zoom, y1/zoom, x2/zoom, y2/zoom) & page.rect | |
| if not rect_pdf.is_empty: | |
| boxes.append(rect_pdf) | |
| except Exception as e: | |
| logger.debug(f"Thumbnail detect failed: {e}") | |
| return boxes | |
| def thumbnail_scan_all_pages(pdf_bytes: bytes, pages: List[int] = None, thumb_dpi: int = THUMBNAIL_DPI) -> Dict[int, List[fitz.Rect]]: | |
| """Scan thumbnails for QR codes.""" | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| total = doc.page_count | |
| if pages: | |
| indices = [p-1 for p in pages if 1 <= p <= total] | |
| else: | |
| indices = list(range(total)) | |
| hits: Dict[int, List[fitz.Rect]] = {} | |
| try: | |
| for i in indices: | |
| page = doc[i] | |
| rects = detect_boxes_on_thumbnail(page, thumb_dpi) | |
| if rects: | |
| hits[i+1] = rects | |
| finally: | |
| doc.close() | |
| return hits | |
| def extract_qr_from_pdf(pdf_bytes: bytes, dpi: int, pages: List[int] = None, scan_all_first: bool = True) -> Optional[Dict[str, Any]]: | |
| """Extract QR from PDF with E-Invoice preference.""" | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| total = doc.page_count | |
| if pages: | |
| target_pages = [p for p in pages if 1 <= p <= total] | |
| else: | |
| target_pages = list(range(1, total+1)) | |
| logger.debug(f"[PDF] target_pages={target_pages}, dpi={dpi}") | |
| zoom = dpi / 72.0 | |
| mat = fitz.Matrix(zoom, zoom) | |
| try: | |
| page_boxes_map: Dict[int, List[fitz.Rect]] = {} | |
| if scan_all_first: | |
| page_boxes_map = thumbnail_scan_all_pages(pdf_bytes, pages=target_pages, thumb_dpi=THUMBNAIL_DPI) | |
| candidate_pages = list(page_boxes_map.keys()) if page_boxes_map else target_pages | |
| else: | |
| candidate_pages = target_pages | |
| for p1 in candidate_pages: | |
| page = doc[p1-1] | |
| clips = page_boxes_map.get(p1, []) | |
| for clip in clips: | |
| pix = page.get_pixmap(matrix=mat, clip=clip, alpha=False) | |
| res = extract_qr_from_image(pix.tobytes("png")) | |
| if res: | |
| res["_page_number"] = p1 | |
| res["_dpi_used"] = dpi | |
| res["_clip"] = [clip.x0, clip.y0, clip.x1, clip.y1] | |
| doc.close() | |
| return res | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| res = extract_qr_from_image(pix.tobytes("png")) | |
| if res: | |
| res["_page_number"] = p1 | |
| res["_dpi_used"] = dpi | |
| res["_clip"] = None | |
| doc.close() | |
| return res | |
| doc.close() | |
| return None | |
| except Exception as e: | |
| logger.error(f"[PDF] Processing error: {e}") | |
| try: | |
| doc.close() | |
| except Exception: | |
| pass | |
| return None | |
| # ============ Processing with Timeout ============ | |
| async def process_file_with_timeout(content: bytes, filename: str, dpi: int, pages: str, scan_all_first: bool) -> Dict[str, Any]: | |
| """Process file with timeout protection.""" | |
| loop = asyncio.get_event_loop() | |
| try: | |
| ext = (filename or "").lower().split(".")[-1] | |
| is_pdf = (ext == "pdf") or content.startswith(b"%PDF") | |
| if is_pdf: | |
| page_list: List[int] = [] | |
| if pages.strip(): | |
| for tok in pages.split(","): | |
| tok = tok.strip() | |
| if tok.isdigit(): | |
| page_list.append(int(tok)) | |
| result = await asyncio.wait_for( | |
| loop.run_in_executor(executor, extract_qr_from_pdf, content, dpi, page_list or None, scan_all_first), | |
| timeout=PROCESSING_TIMEOUT | |
| ) | |
| else: | |
| result = await asyncio.wait_for( | |
| loop.run_in_executor(executor, extract_qr_from_image, content), | |
| timeout=PROCESSING_TIMEOUT | |
| ) | |
| if result: | |
| return { | |
| "success": True, | |
| "qr_data": result, | |
| "message": "QR code extracted successfully (E-Invoice preferred)", | |
| "filename": filename | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "qr_data": None, | |
| "message": "No QR code found in the document", | |
| "filename": filename | |
| } | |
| except asyncio.TimeoutError: | |
| logger.error(f"Timeout processing {filename}") | |
| return { | |
| "success": False, | |
| "qr_data": None, | |
| "message": "Processing timeout exceeded", | |
| "filename": filename, | |
| "error": "timeout" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing {filename}: {e}") | |
| return { | |
| "success": False, | |
| "qr_data": None, | |
| "message": f"Error processing file: {str(e)}", | |
| "filename": filename, | |
| "error": str(e) | |
| } | |
| # ============ API Endpoints ============ | |
| def root(): | |
| return { | |
| "service": "E-Invoice QR Extractor API", | |
| "version": "2.3.0", | |
| "features": { | |
| "single_file": True, | |
| "batch_upload": True, | |
| "concurrency_limit": MAX_CONCURRENT_REQUESTS, | |
| "max_file_size": f"{MAX_FILE_SIZE / (1024*1024)}MB", | |
| "timeout": f"{PROCESSING_TIMEOUT}s" | |
| }, | |
| "endpoints": { | |
| "single": "POST /extract-qr", | |
| "batch": "POST /batch-extract", | |
| "tracking": "GET /tracking/{request_id}", | |
| "stats": "GET /stats", | |
| "health": "GET /health" | |
| } | |
| } | |
| async def extract_qr( | |
| file: UploadFile = File(...), | |
| dpi: int = 1200, | |
| pages: str = "", | |
| scan_all_first: bool = True | |
| ): | |
| """ | |
| Single file QR extraction with concurrency control. | |
| """ | |
| request_id = str(uuid.uuid4()) | |
| async with processing_semaphore: | |
| try: | |
| content = await file.read() | |
| file_size = len(content) | |
| if file_size > MAX_FILE_SIZE: | |
| raise HTTPException(status_code=413, detail="File too large") | |
| file_hash = get_file_hash(content) | |
| logger.info(f"[{request_id}] Processing: {file.filename}, size: {file_size} bytes") | |
| # Track request | |
| track_request(request_id, { | |
| "filename": file.filename, | |
| "file_size": file_size, | |
| "file_hash": file_hash, | |
| "dpi": dpi, | |
| "pages": pages | |
| }) | |
| # Process with timeout | |
| result = await process_file_with_timeout(content, file.filename, dpi, pages, scan_all_first) | |
| # Update tracking | |
| update_request(request_id, "completed" if result["success"] else "no_qr_found", result) | |
| logger.info(f"[{request_id}] Completed: {result['success']}") | |
| if result["success"]: | |
| return JSONResponse(status_code=200, content={ | |
| "success": True, | |
| "qr_data": result["qr_data"], | |
| "message": result["message"], | |
| "request_id": request_id | |
| }) | |
| else: | |
| return JSONResponse(status_code=404, content={ | |
| "success": False, | |
| "qr_data": None, | |
| "message": result["message"], | |
| "request_id": request_id | |
| }) | |
| except HTTPException: | |
| update_request(request_id, "failed", {"error": "File too large"}) | |
| raise | |
| except Exception as e: | |
| logger.error(f"[{request_id}] Error: {e}") | |
| update_request(request_id, "failed", {"error": str(e)}) | |
| raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") | |
| async def batch_extract( | |
| files: List[UploadFile] = File(...), | |
| dpi: int = 1200, | |
| pages: str = "" | |
| ): | |
| """ | |
| Batch file upload - process multiple files concurrently. | |
| Returns results for all files. | |
| """ | |
| batch_id = str(uuid.uuid4()) | |
| logger.info(f"[BATCH {batch_id}] Received {len(files)} files") | |
| results = [] | |
| # Process files with semaphore control | |
| async def process_single(file: UploadFile, index: int): | |
| async with processing_semaphore: | |
| try: | |
| content = await file.read() | |
| file_size = len(content) | |
| if file_size > MAX_FILE_SIZE: | |
| return { | |
| "filename": file.filename, | |
| "index": index, | |
| "success": False, | |
| "error": "File too large", | |
| "file_size": file_size | |
| } | |
| logger.info(f"[BATCH {batch_id}] [{index+1}/{len(files)}] Processing: {file.filename}") | |
| result = await process_file_with_timeout(content, file.filename, dpi, pages, True) | |
| result["index"] = index | |
| result["file_size"] = file_size | |
| return result | |
| except Exception as e: | |
| logger.error(f"[BATCH {batch_id}] Error processing {file.filename}: {e}") | |
| return { | |
| "filename": file.filename, | |
| "index": index, | |
| "success": False, | |
| "error": str(e) | |
| } | |
| # Process all files concurrently (limited by semaphore) | |
| tasks = [process_single(file, i) for i, file in enumerate(files)] | |
| results = await asyncio.gather(*tasks) | |
| # Summary | |
| successful = sum(1 for r in results if r.get("success")) | |
| failed = len(results) - successful | |
| logger.info(f"[BATCH {batch_id}] Completed: {successful} successful, {failed} failed") | |
| return { | |
| "batch_id": batch_id, | |
| "total_files": len(files), | |
| "successful": successful, | |
| "failed": failed, | |
| "results": results | |
| } | |
| def get_tracking(request_id: str): | |
| """Get tracking information for a request.""" | |
| if request_id not in request_tracking: | |
| raise HTTPException(status_code=404, detail="Request not found") | |
| return request_tracking[request_id] | |
| def get_stats(): | |
| """Get current processing statistics.""" | |
| cleanup_old_tracking() | |
| total_requests = len(request_tracking) | |
| completed = sum(1 for r in request_tracking.values() if r["status"] == "completed") | |
| failed = sum(1 for r in request_tracking.values() if r["status"] == "failed") | |
| processing = sum(1 for r in request_tracking.values() if r["status"] == "processing") | |
| return { | |
| "current_load": { | |
| "processing": processing, | |
| "available_slots": MAX_CONCURRENT_REQUESTS - processing, | |
| "max_concurrent": MAX_CONCURRENT_REQUESTS | |
| }, | |
| "statistics": { | |
| "total_requests": total_requests, | |
| "completed": completed, | |
| "failed": failed, | |
| "no_qr_found": sum(1 for r in request_tracking.values() if r["status"] == "no_qr_found") | |
| }, | |
| "configuration": { | |
| "max_file_size": f"{MAX_FILE_SIZE / (1024*1024)}MB", | |
| "processing_timeout": f"{PROCESSING_TIMEOUT}s", | |
| "thumbnail_dpi": THUMBNAIL_DPI, | |
| "max_concurrent": MAX_CONCURRENT_REQUESTS | |
| } | |
| } | |
| def health_check(): | |
| """Health check endpoint.""" | |
| processing = sum(1 for r in request_tracking.values() if r["status"] == "processing") | |
| return { | |
| "status": "healthy", | |
| "processing": processing, | |
| "available_slots": MAX_CONCURRENT_REQUESTS - processing | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info", workers=1) |