from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import cv2 import numpy as np from pyzbar import pyzbar from PIL import Image import fitz # PyMuPDF import json import base64 import logging import re import zlib import asyncio import warnings import os import hashlib import time from typing import Optional, Dict, Any, List, Tuple from datetime import datetime from concurrent.futures import ThreadPoolExecutor import uuid # ============ Suppress Warnings ============ warnings.filterwarnings('ignore') os.environ['ZBAR_VERBOSE'] = '0' # Suppress stderr on Windows import sys if sys.platform == 'win32': import ctypes try: kernel32 = ctypes.WinDLL('kernel32') except: pass logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - [%(levelname)s] - %(message)s' ) logger = logging.getLogger(__name__) app = FastAPI(title="E-Invoice QR Extractor API", version="2.3.0") # ============ Configuration ============ ALLOW_PAYMENT_FALLBACK = False THUMBNAIL_DPI = 220 CROP_MARGIN = 0.15 MAX_CONCURRENT_REQUESTS = 5 # Limit concurrent processing MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB PROCESSING_TIMEOUT = 300 # 5 minutes # ============ Concurrency Control ============ processing_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) executor = ThreadPoolExecutor(max_workers=MAX_CONCURRENT_REQUESTS) # ============ In-Memory Tracking (simple) ============ request_tracking: Dict[str, Dict[str, Any]] = {} # ============ CORS ============ app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============ Helper Functions ============ def get_file_hash(content: bytes) -> str: """Generate MD5 hash for file deduplication.""" return hashlib.md5(content).hexdigest() def track_request(request_id: str, data: Dict[str, Any]): """Track request in memory.""" request_tracking[request_id] = { **data, "created_at": datetime.now().isoformat(), "status": "processing" } def update_request(request_id: str, status: str, result: Optional[Dict] = None): """Update request status.""" if request_id in request_tracking: request_tracking[request_id]["status"] = status request_tracking[request_id]["updated_at"] = datetime.now().isoformat() if result: request_tracking[request_id]["result"] = result def cleanup_old_tracking(): """Remove tracking older than 1 hour.""" cutoff = time.time() - 3600 to_remove = [] for req_id, data in request_tracking.items(): created = datetime.fromisoformat(data["created_at"]).timestamp() if created < cutoff: to_remove.append(req_id) for req_id in to_remove: del request_tracking[req_id] # ============ Decoding Helpers ============ def decode_gst_qr(raw_data: str) -> Dict[str, Any]: """Decode GST QR (JSON / JWT-like base64url / Base64 / Base64+zlib).""" s = raw_data or "" # JSON try: return json.loads(s) except Exception: pass # JWT-like base64url payload try: if "." in s: parts = s.split(".") if len(parts) >= 2: payload = parts[1] payload += "=" * ((4 - len(payload) % 4) % 4) b = base64.urlsafe_b64decode(payload) try: return json.loads(b.decode("utf-8")) except Exception: try: return json.loads(zlib.decompress(b).decode("utf-8")) except Exception: pass except Exception: pass # Plain Base64 try: padded = s + "=" * ((4 - len(s) % 4) % 4) b = base64.b64decode(padded) try: return json.loads(b.decode("utf-8")) except Exception: try: return json.loads(zlib.decompress(b).decode("utf-8")) except Exception: pass except Exception: pass return {"raw": s} # ============ Classification ============ GSTIN_REGEX = re.compile(r"\b\d{2}[A-Z]{5}\d{4}[A-Z]\dZ[A-Z0-9]\b") def classify_qr(raw_text: str, decoded_obj: Optional[Dict[str, Any]] = None) -> str: """Return 'einvoice' | 'payment' | 'unknown'""" text = (raw_text or "").strip() lower = text.lower() # Payment patterns if lower.startswith("upi://") or "upi://pay" in lower: return "payment" if text.startswith("000201"): return "payment" if any(k in lower for k in ["paytm", "phonepe", "gpay", "googlepay", "bharatqr", "bhim upi", "upi://collect"]): return "payment" # E-Invoice patterns parsed = decoded_obj or decode_gst_qr(text) if isinstance(parsed, dict): keys = {k.lower() for k in parsed.keys()} e_keys = {"irn", "sellergstin", "buyergstin", "docno", "docdt", "totinvval", "mainhsncode", "signedqrcode"} if keys & e_keys: return "einvoice" if GSTIN_REGEX.search(text): return "einvoice" if len(text) >= 200 and not (lower.startswith("upi://") or text.startswith("000201")): return "einvoice" return "unknown" def pick_einvoice_first(payloads: List[str]) -> Optional[Dict[str, Any]]: """Prefer E-Invoice QR over others.""" first_any = None for raw in payloads: dec = decode_gst_qr(raw) cls = classify_qr(raw, dec) if first_any is None: first_any = (dec, cls) if cls == "einvoice": dec["_classification"] = "einvoice" return dec if ALLOW_PAYMENT_FALLBACK and first_any: dec, cls = first_any dec["_classification"] = cls dec["_note"] = "No E-Invoice QR; returning first available QR" return dec return None # ============ Image Processing ============ def enhance_image_for_qr(image: np.ndarray) -> np.ndarray: """Enhance image for better QR detection.""" gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) enhanced = clahe.apply(gray) denoised = cv2.fastNlMeansDenoising(enhanced, None, 10, 7, 21) kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) sharpened = cv2.filter2D(denoised, -1, kernel) binary = cv2.adaptiveThreshold(sharpened, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) return binary # ============ QR Decoders ============ def detect_all_qr_opencv(image: np.ndarray) -> List[str]: """Detect multiple QR codes using OpenCV.""" res: List[str] = [] det = cv2.QRCodeDetector() try: data_list, _, _ = det.detectAndDecodeMulti(image) if isinstance(data_list, (list, tuple)): res.extend([d for d in data_list if d]) except Exception: pass try: d, _, _ = det.detectAndDecode(image) if d: res.append(d) except Exception: pass return res def detect_all_qr_pyzbar(image: np.ndarray) -> List[str]: """Detect multiple QR codes using pyzbar (with warning suppression).""" res: List[str] = [] try: with warnings.catch_warnings(): warnings.simplefilter("ignore") decoded_objects = pyzbar.decode(image) for obj in decoded_objects: if obj.type == 'QRCODE': try: s = obj.data.decode('utf-8', errors='ignore') if s: res.append(s) except Exception: continue except Exception: pass return res def dedupe_keep_order(items: List[str]) -> List[str]: seen = set() out: List[str] = [] for it in items: if it not in seen: seen.add(it) out.append(it) return out # ============ Image QR Extraction ============ def extract_qr_from_image(image_bytes: bytes) -> Optional[Dict[str, Any]]: """Decode ALL QRs and return E-Invoice preferred.""" nparr = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: return None all_payloads: List[str] = [] all_payloads += detect_all_qr_opencv(img) all_payloads += detect_all_qr_pyzbar(img) if not all_payloads: enh = enhance_image_for_qr(img) enh_bgr = cv2.cvtColor(enh, cv2.COLOR_GRAY2BGR) all_payloads += detect_all_qr_opencv(enh_bgr) all_payloads += detect_all_qr_pyzbar(enh) all_payloads = dedupe_keep_order(all_payloads) if not all_payloads: h, w = img.shape[:2] regions = [ img[0:int(h*0.45), int(w*0.55):w], img[:, int(w*0.66):w], ] for roi in regions: if roi.size == 0: continue p = detect_all_qr_opencv(roi) + detect_all_qr_pyzbar(roi) all_payloads += p if p: break all_payloads = dedupe_keep_order(all_payloads) if not all_payloads: return None return pick_einvoice_first(all_payloads) # ============ PDF Processing ============ def detect_boxes_on_thumbnail(page: fitz.Page, thumb_dpi: int = THUMBNAIL_DPI) -> List[fitz.Rect]: """Detect QR boxes on thumbnail.""" zoom = thumb_dpi / 72.0 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat, alpha=False) img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) if pix.n == 4: img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) boxes: List[fitz.Rect] = [] det = cv2.QRCodeDetector() try: ok, pts = det.detect(img) if ok and pts is not None: for poly in np.array(pts): xs, ys = poly[:, 0], poly[:, 1] x1, y1, x2, y2 = float(xs.min()), float(ys.min()), float(xs.max()), float(ys.max()) w, h = x2 - x1, y2 - y1 x1 -= CROP_MARGIN * w y1 -= CROP_MARGIN * h x2 += CROP_MARGIN * w y2 += CROP_MARGIN * h rect_pdf = fitz.Rect(x1/zoom, y1/zoom, x2/zoom, y2/zoom) & page.rect if not rect_pdf.is_empty: boxes.append(rect_pdf) except Exception as e: logger.debug(f"Thumbnail detect failed: {e}") return boxes def thumbnail_scan_all_pages(pdf_bytes: bytes, pages: List[int] = None, thumb_dpi: int = THUMBNAIL_DPI) -> Dict[int, List[fitz.Rect]]: """Scan thumbnails for QR codes.""" doc = fitz.open(stream=pdf_bytes, filetype="pdf") total = doc.page_count if pages: indices = [p-1 for p in pages if 1 <= p <= total] else: indices = list(range(total)) hits: Dict[int, List[fitz.Rect]] = {} try: for i in indices: page = doc[i] rects = detect_boxes_on_thumbnail(page, thumb_dpi) if rects: hits[i+1] = rects finally: doc.close() return hits def extract_qr_from_pdf(pdf_bytes: bytes, dpi: int, pages: List[int] = None, scan_all_first: bool = True) -> Optional[Dict[str, Any]]: """Extract QR from PDF with E-Invoice preference.""" doc = fitz.open(stream=pdf_bytes, filetype="pdf") total = doc.page_count if pages: target_pages = [p for p in pages if 1 <= p <= total] else: target_pages = list(range(1, total+1)) logger.debug(f"[PDF] target_pages={target_pages}, dpi={dpi}") zoom = dpi / 72.0 mat = fitz.Matrix(zoom, zoom) try: page_boxes_map: Dict[int, List[fitz.Rect]] = {} if scan_all_first: page_boxes_map = thumbnail_scan_all_pages(pdf_bytes, pages=target_pages, thumb_dpi=THUMBNAIL_DPI) candidate_pages = list(page_boxes_map.keys()) if page_boxes_map else target_pages else: candidate_pages = target_pages for p1 in candidate_pages: page = doc[p1-1] clips = page_boxes_map.get(p1, []) for clip in clips: pix = page.get_pixmap(matrix=mat, clip=clip, alpha=False) res = extract_qr_from_image(pix.tobytes("png")) if res: res["_page_number"] = p1 res["_dpi_used"] = dpi res["_clip"] = [clip.x0, clip.y0, clip.x1, clip.y1] doc.close() return res pix = page.get_pixmap(matrix=mat, alpha=False) res = extract_qr_from_image(pix.tobytes("png")) if res: res["_page_number"] = p1 res["_dpi_used"] = dpi res["_clip"] = None doc.close() return res doc.close() return None except Exception as e: logger.error(f"[PDF] Processing error: {e}") try: doc.close() except Exception: pass return None # ============ Processing with Timeout ============ async def process_file_with_timeout(content: bytes, filename: str, dpi: int, pages: str, scan_all_first: bool) -> Dict[str, Any]: """Process file with timeout protection.""" loop = asyncio.get_event_loop() try: ext = (filename or "").lower().split(".")[-1] is_pdf = (ext == "pdf") or content.startswith(b"%PDF") if is_pdf: page_list: List[int] = [] if pages.strip(): for tok in pages.split(","): tok = tok.strip() if tok.isdigit(): page_list.append(int(tok)) result = await asyncio.wait_for( loop.run_in_executor(executor, extract_qr_from_pdf, content, dpi, page_list or None, scan_all_first), timeout=PROCESSING_TIMEOUT ) else: result = await asyncio.wait_for( loop.run_in_executor(executor, extract_qr_from_image, content), timeout=PROCESSING_TIMEOUT ) if result: return { "success": True, "qr_data": result, "message": "QR code extracted successfully (E-Invoice preferred)", "filename": filename } else: return { "success": False, "qr_data": None, "message": "No QR code found in the document", "filename": filename } except asyncio.TimeoutError: logger.error(f"Timeout processing {filename}") return { "success": False, "qr_data": None, "message": "Processing timeout exceeded", "filename": filename, "error": "timeout" } except Exception as e: logger.error(f"Error processing {filename}: {e}") return { "success": False, "qr_data": None, "message": f"Error processing file: {str(e)}", "filename": filename, "error": str(e) } # ============ API Endpoints ============ @app.get("/") def root(): return { "service": "E-Invoice QR Extractor API", "version": "2.3.0", "features": { "single_file": True, "batch_upload": True, "concurrency_limit": MAX_CONCURRENT_REQUESTS, "max_file_size": f"{MAX_FILE_SIZE / (1024*1024)}MB", "timeout": f"{PROCESSING_TIMEOUT}s" }, "endpoints": { "single": "POST /extract-qr", "batch": "POST /batch-extract", "tracking": "GET /tracking/{request_id}", "stats": "GET /stats", "health": "GET /health" } } @app.post("/extract-qr") async def extract_qr( file: UploadFile = File(...), dpi: int = 1200, pages: str = "", scan_all_first: bool = True ): """ Single file QR extraction with concurrency control. """ request_id = str(uuid.uuid4()) async with processing_semaphore: try: content = await file.read() file_size = len(content) if file_size > MAX_FILE_SIZE: raise HTTPException(status_code=413, detail="File too large") file_hash = get_file_hash(content) logger.info(f"[{request_id}] Processing: {file.filename}, size: {file_size} bytes") # Track request track_request(request_id, { "filename": file.filename, "file_size": file_size, "file_hash": file_hash, "dpi": dpi, "pages": pages }) # Process with timeout result = await process_file_with_timeout(content, file.filename, dpi, pages, scan_all_first) # Update tracking update_request(request_id, "completed" if result["success"] else "no_qr_found", result) logger.info(f"[{request_id}] Completed: {result['success']}") if result["success"]: return JSONResponse(status_code=200, content={ "success": True, "qr_data": result["qr_data"], "message": result["message"], "request_id": request_id }) else: return JSONResponse(status_code=404, content={ "success": False, "qr_data": None, "message": result["message"], "request_id": request_id }) except HTTPException: update_request(request_id, "failed", {"error": "File too large"}) raise except Exception as e: logger.error(f"[{request_id}] Error: {e}") update_request(request_id, "failed", {"error": str(e)}) raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") @app.post("/batch-extract") async def batch_extract( files: List[UploadFile] = File(...), dpi: int = 1200, pages: str = "" ): """ Batch file upload - process multiple files concurrently. Returns results for all files. """ batch_id = str(uuid.uuid4()) logger.info(f"[BATCH {batch_id}] Received {len(files)} files") results = [] # Process files with semaphore control async def process_single(file: UploadFile, index: int): async with processing_semaphore: try: content = await file.read() file_size = len(content) if file_size > MAX_FILE_SIZE: return { "filename": file.filename, "index": index, "success": False, "error": "File too large", "file_size": file_size } logger.info(f"[BATCH {batch_id}] [{index+1}/{len(files)}] Processing: {file.filename}") result = await process_file_with_timeout(content, file.filename, dpi, pages, True) result["index"] = index result["file_size"] = file_size return result except Exception as e: logger.error(f"[BATCH {batch_id}] Error processing {file.filename}: {e}") return { "filename": file.filename, "index": index, "success": False, "error": str(e) } # Process all files concurrently (limited by semaphore) tasks = [process_single(file, i) for i, file in enumerate(files)] results = await asyncio.gather(*tasks) # Summary successful = sum(1 for r in results if r.get("success")) failed = len(results) - successful logger.info(f"[BATCH {batch_id}] Completed: {successful} successful, {failed} failed") return { "batch_id": batch_id, "total_files": len(files), "successful": successful, "failed": failed, "results": results } @app.get("/tracking/{request_id}") def get_tracking(request_id: str): """Get tracking information for a request.""" if request_id not in request_tracking: raise HTTPException(status_code=404, detail="Request not found") return request_tracking[request_id] @app.get("/stats") def get_stats(): """Get current processing statistics.""" cleanup_old_tracking() total_requests = len(request_tracking) completed = sum(1 for r in request_tracking.values() if r["status"] == "completed") failed = sum(1 for r in request_tracking.values() if r["status"] == "failed") processing = sum(1 for r in request_tracking.values() if r["status"] == "processing") return { "current_load": { "processing": processing, "available_slots": MAX_CONCURRENT_REQUESTS - processing, "max_concurrent": MAX_CONCURRENT_REQUESTS }, "statistics": { "total_requests": total_requests, "completed": completed, "failed": failed, "no_qr_found": sum(1 for r in request_tracking.values() if r["status"] == "no_qr_found") }, "configuration": { "max_file_size": f"{MAX_FILE_SIZE / (1024*1024)}MB", "processing_timeout": f"{PROCESSING_TIMEOUT}s", "thumbnail_dpi": THUMBNAIL_DPI, "max_concurrent": MAX_CONCURRENT_REQUESTS } } @app.get("/health") def health_check(): """Health check endpoint.""" processing = sum(1 for r in request_tracking.values() if r["status"] == "processing") return { "status": "healthy", "processing": processing, "available_slots": MAX_CONCURRENT_REQUESTS - processing } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info", workers=1)