aniket9909's picture
Create app.py
f816f01 verified
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import cv2
import numpy as np
from pyzbar import pyzbar
from PIL import Image
import fitz # PyMuPDF
import json
import base64
import logging
import re
import zlib
import asyncio
import warnings
import os
import hashlib
import time
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import uuid
# ============ Suppress Warnings ============
warnings.filterwarnings('ignore')
os.environ['ZBAR_VERBOSE'] = '0'
# Suppress stderr on Windows
import sys
if sys.platform == 'win32':
import ctypes
try:
kernel32 = ctypes.WinDLL('kernel32')
except:
pass
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - [%(levelname)s] - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="E-Invoice QR Extractor API", version="2.3.0")
# ============ Configuration ============
ALLOW_PAYMENT_FALLBACK = False
THUMBNAIL_DPI = 220
CROP_MARGIN = 0.15
MAX_CONCURRENT_REQUESTS = 5 # Limit concurrent processing
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
PROCESSING_TIMEOUT = 300 # 5 minutes
# ============ Concurrency Control ============
processing_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
executor = ThreadPoolExecutor(max_workers=MAX_CONCURRENT_REQUESTS)
# ============ In-Memory Tracking (simple) ============
request_tracking: Dict[str, Dict[str, Any]] = {}
# ============ CORS ============
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============ Helper Functions ============
def get_file_hash(content: bytes) -> str:
"""Generate MD5 hash for file deduplication."""
return hashlib.md5(content).hexdigest()
def track_request(request_id: str, data: Dict[str, Any]):
"""Track request in memory."""
request_tracking[request_id] = {
**data,
"created_at": datetime.now().isoformat(),
"status": "processing"
}
def update_request(request_id: str, status: str, result: Optional[Dict] = None):
"""Update request status."""
if request_id in request_tracking:
request_tracking[request_id]["status"] = status
request_tracking[request_id]["updated_at"] = datetime.now().isoformat()
if result:
request_tracking[request_id]["result"] = result
def cleanup_old_tracking():
"""Remove tracking older than 1 hour."""
cutoff = time.time() - 3600
to_remove = []
for req_id, data in request_tracking.items():
created = datetime.fromisoformat(data["created_at"]).timestamp()
if created < cutoff:
to_remove.append(req_id)
for req_id in to_remove:
del request_tracking[req_id]
# ============ Decoding Helpers ============
def decode_gst_qr(raw_data: str) -> Dict[str, Any]:
"""Decode GST QR (JSON / JWT-like base64url / Base64 / Base64+zlib)."""
s = raw_data or ""
# JSON
try:
return json.loads(s)
except Exception:
pass
# JWT-like base64url payload
try:
if "." in s:
parts = s.split(".")
if len(parts) >= 2:
payload = parts[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
b = base64.urlsafe_b64decode(payload)
try:
return json.loads(b.decode("utf-8"))
except Exception:
try:
return json.loads(zlib.decompress(b).decode("utf-8"))
except Exception:
pass
except Exception:
pass
# Plain Base64
try:
padded = s + "=" * ((4 - len(s) % 4) % 4)
b = base64.b64decode(padded)
try:
return json.loads(b.decode("utf-8"))
except Exception:
try:
return json.loads(zlib.decompress(b).decode("utf-8"))
except Exception:
pass
except Exception:
pass
return {"raw": s}
# ============ Classification ============
GSTIN_REGEX = re.compile(r"\b\d{2}[A-Z]{5}\d{4}[A-Z]\dZ[A-Z0-9]\b")
def classify_qr(raw_text: str, decoded_obj: Optional[Dict[str, Any]] = None) -> str:
"""Return 'einvoice' | 'payment' | 'unknown'"""
text = (raw_text or "").strip()
lower = text.lower()
# Payment patterns
if lower.startswith("upi://") or "upi://pay" in lower:
return "payment"
if text.startswith("000201"):
return "payment"
if any(k in lower for k in ["paytm", "phonepe", "gpay", "googlepay", "bharatqr", "bhim upi", "upi://collect"]):
return "payment"
# E-Invoice patterns
parsed = decoded_obj or decode_gst_qr(text)
if isinstance(parsed, dict):
keys = {k.lower() for k in parsed.keys()}
e_keys = {"irn", "sellergstin", "buyergstin", "docno", "docdt", "totinvval", "mainhsncode", "signedqrcode"}
if keys & e_keys:
return "einvoice"
if GSTIN_REGEX.search(text):
return "einvoice"
if len(text) >= 200 and not (lower.startswith("upi://") or text.startswith("000201")):
return "einvoice"
return "unknown"
def pick_einvoice_first(payloads: List[str]) -> Optional[Dict[str, Any]]:
"""Prefer E-Invoice QR over others."""
first_any = None
for raw in payloads:
dec = decode_gst_qr(raw)
cls = classify_qr(raw, dec)
if first_any is None:
first_any = (dec, cls)
if cls == "einvoice":
dec["_classification"] = "einvoice"
return dec
if ALLOW_PAYMENT_FALLBACK and first_any:
dec, cls = first_any
dec["_classification"] = cls
dec["_note"] = "No E-Invoice QR; returning first available QR"
return dec
return None
# ============ Image Processing ============
def enhance_image_for_qr(image: np.ndarray) -> np.ndarray:
"""Enhance image for better QR detection."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
denoised = cv2.fastNlMeansDenoising(enhanced, None, 10, 7, 21)
kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
sharpened = cv2.filter2D(denoised, -1, kernel)
binary = cv2.adaptiveThreshold(sharpened, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
return binary
# ============ QR Decoders ============
def detect_all_qr_opencv(image: np.ndarray) -> List[str]:
"""Detect multiple QR codes using OpenCV."""
res: List[str] = []
det = cv2.QRCodeDetector()
try:
data_list, _, _ = det.detectAndDecodeMulti(image)
if isinstance(data_list, (list, tuple)):
res.extend([d for d in data_list if d])
except Exception:
pass
try:
d, _, _ = det.detectAndDecode(image)
if d:
res.append(d)
except Exception:
pass
return res
def detect_all_qr_pyzbar(image: np.ndarray) -> List[str]:
"""Detect multiple QR codes using pyzbar (with warning suppression)."""
res: List[str] = []
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
decoded_objects = pyzbar.decode(image)
for obj in decoded_objects:
if obj.type == 'QRCODE':
try:
s = obj.data.decode('utf-8', errors='ignore')
if s:
res.append(s)
except Exception:
continue
except Exception:
pass
return res
def dedupe_keep_order(items: List[str]) -> List[str]:
seen = set()
out: List[str] = []
for it in items:
if it not in seen:
seen.add(it)
out.append(it)
return out
# ============ Image QR Extraction ============
def extract_qr_from_image(image_bytes: bytes) -> Optional[Dict[str, Any]]:
"""Decode ALL QRs and return E-Invoice preferred."""
nparr = np.frombuffer(image_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
return None
all_payloads: List[str] = []
all_payloads += detect_all_qr_opencv(img)
all_payloads += detect_all_qr_pyzbar(img)
if not all_payloads:
enh = enhance_image_for_qr(img)
enh_bgr = cv2.cvtColor(enh, cv2.COLOR_GRAY2BGR)
all_payloads += detect_all_qr_opencv(enh_bgr)
all_payloads += detect_all_qr_pyzbar(enh)
all_payloads = dedupe_keep_order(all_payloads)
if not all_payloads:
h, w = img.shape[:2]
regions = [
img[0:int(h*0.45), int(w*0.55):w],
img[:, int(w*0.66):w],
]
for roi in regions:
if roi.size == 0:
continue
p = detect_all_qr_opencv(roi) + detect_all_qr_pyzbar(roi)
all_payloads += p
if p:
break
all_payloads = dedupe_keep_order(all_payloads)
if not all_payloads:
return None
return pick_einvoice_first(all_payloads)
# ============ PDF Processing ============
def detect_boxes_on_thumbnail(page: fitz.Page, thumb_dpi: int = THUMBNAIL_DPI) -> List[fitz.Rect]:
"""Detect QR boxes on thumbnail."""
zoom = thumb_dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
if pix.n == 4:
img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
boxes: List[fitz.Rect] = []
det = cv2.QRCodeDetector()
try:
ok, pts = det.detect(img)
if ok and pts is not None:
for poly in np.array(pts):
xs, ys = poly[:, 0], poly[:, 1]
x1, y1, x2, y2 = float(xs.min()), float(ys.min()), float(xs.max()), float(ys.max())
w, h = x2 - x1, y2 - y1
x1 -= CROP_MARGIN * w
y1 -= CROP_MARGIN * h
x2 += CROP_MARGIN * w
y2 += CROP_MARGIN * h
rect_pdf = fitz.Rect(x1/zoom, y1/zoom, x2/zoom, y2/zoom) & page.rect
if not rect_pdf.is_empty:
boxes.append(rect_pdf)
except Exception as e:
logger.debug(f"Thumbnail detect failed: {e}")
return boxes
def thumbnail_scan_all_pages(pdf_bytes: bytes, pages: List[int] = None, thumb_dpi: int = THUMBNAIL_DPI) -> Dict[int, List[fitz.Rect]]:
"""Scan thumbnails for QR codes."""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
total = doc.page_count
if pages:
indices = [p-1 for p in pages if 1 <= p <= total]
else:
indices = list(range(total))
hits: Dict[int, List[fitz.Rect]] = {}
try:
for i in indices:
page = doc[i]
rects = detect_boxes_on_thumbnail(page, thumb_dpi)
if rects:
hits[i+1] = rects
finally:
doc.close()
return hits
def extract_qr_from_pdf(pdf_bytes: bytes, dpi: int, pages: List[int] = None, scan_all_first: bool = True) -> Optional[Dict[str, Any]]:
"""Extract QR from PDF with E-Invoice preference."""
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
total = doc.page_count
if pages:
target_pages = [p for p in pages if 1 <= p <= total]
else:
target_pages = list(range(1, total+1))
logger.debug(f"[PDF] target_pages={target_pages}, dpi={dpi}")
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
try:
page_boxes_map: Dict[int, List[fitz.Rect]] = {}
if scan_all_first:
page_boxes_map = thumbnail_scan_all_pages(pdf_bytes, pages=target_pages, thumb_dpi=THUMBNAIL_DPI)
candidate_pages = list(page_boxes_map.keys()) if page_boxes_map else target_pages
else:
candidate_pages = target_pages
for p1 in candidate_pages:
page = doc[p1-1]
clips = page_boxes_map.get(p1, [])
for clip in clips:
pix = page.get_pixmap(matrix=mat, clip=clip, alpha=False)
res = extract_qr_from_image(pix.tobytes("png"))
if res:
res["_page_number"] = p1
res["_dpi_used"] = dpi
res["_clip"] = [clip.x0, clip.y0, clip.x1, clip.y1]
doc.close()
return res
pix = page.get_pixmap(matrix=mat, alpha=False)
res = extract_qr_from_image(pix.tobytes("png"))
if res:
res["_page_number"] = p1
res["_dpi_used"] = dpi
res["_clip"] = None
doc.close()
return res
doc.close()
return None
except Exception as e:
logger.error(f"[PDF] Processing error: {e}")
try:
doc.close()
except Exception:
pass
return None
# ============ Processing with Timeout ============
async def process_file_with_timeout(content: bytes, filename: str, dpi: int, pages: str, scan_all_first: bool) -> Dict[str, Any]:
"""Process file with timeout protection."""
loop = asyncio.get_event_loop()
try:
ext = (filename or "").lower().split(".")[-1]
is_pdf = (ext == "pdf") or content.startswith(b"%PDF")
if is_pdf:
page_list: List[int] = []
if pages.strip():
for tok in pages.split(","):
tok = tok.strip()
if tok.isdigit():
page_list.append(int(tok))
result = await asyncio.wait_for(
loop.run_in_executor(executor, extract_qr_from_pdf, content, dpi, page_list or None, scan_all_first),
timeout=PROCESSING_TIMEOUT
)
else:
result = await asyncio.wait_for(
loop.run_in_executor(executor, extract_qr_from_image, content),
timeout=PROCESSING_TIMEOUT
)
if result:
return {
"success": True,
"qr_data": result,
"message": "QR code extracted successfully (E-Invoice preferred)",
"filename": filename
}
else:
return {
"success": False,
"qr_data": None,
"message": "No QR code found in the document",
"filename": filename
}
except asyncio.TimeoutError:
logger.error(f"Timeout processing {filename}")
return {
"success": False,
"qr_data": None,
"message": "Processing timeout exceeded",
"filename": filename,
"error": "timeout"
}
except Exception as e:
logger.error(f"Error processing {filename}: {e}")
return {
"success": False,
"qr_data": None,
"message": f"Error processing file: {str(e)}",
"filename": filename,
"error": str(e)
}
# ============ API Endpoints ============
@app.get("/")
def root():
return {
"service": "E-Invoice QR Extractor API",
"version": "2.3.0",
"features": {
"single_file": True,
"batch_upload": True,
"concurrency_limit": MAX_CONCURRENT_REQUESTS,
"max_file_size": f"{MAX_FILE_SIZE / (1024*1024)}MB",
"timeout": f"{PROCESSING_TIMEOUT}s"
},
"endpoints": {
"single": "POST /extract-qr",
"batch": "POST /batch-extract",
"tracking": "GET /tracking/{request_id}",
"stats": "GET /stats",
"health": "GET /health"
}
}
@app.post("/extract-qr")
async def extract_qr(
file: UploadFile = File(...),
dpi: int = 1200,
pages: str = "",
scan_all_first: bool = True
):
"""
Single file QR extraction with concurrency control.
"""
request_id = str(uuid.uuid4())
async with processing_semaphore:
try:
content = await file.read()
file_size = len(content)
if file_size > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large")
file_hash = get_file_hash(content)
logger.info(f"[{request_id}] Processing: {file.filename}, size: {file_size} bytes")
# Track request
track_request(request_id, {
"filename": file.filename,
"file_size": file_size,
"file_hash": file_hash,
"dpi": dpi,
"pages": pages
})
# Process with timeout
result = await process_file_with_timeout(content, file.filename, dpi, pages, scan_all_first)
# Update tracking
update_request(request_id, "completed" if result["success"] else "no_qr_found", result)
logger.info(f"[{request_id}] Completed: {result['success']}")
if result["success"]:
return JSONResponse(status_code=200, content={
"success": True,
"qr_data": result["qr_data"],
"message": result["message"],
"request_id": request_id
})
else:
return JSONResponse(status_code=404, content={
"success": False,
"qr_data": None,
"message": result["message"],
"request_id": request_id
})
except HTTPException:
update_request(request_id, "failed", {"error": "File too large"})
raise
except Exception as e:
logger.error(f"[{request_id}] Error: {e}")
update_request(request_id, "failed", {"error": str(e)})
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}")
@app.post("/batch-extract")
async def batch_extract(
files: List[UploadFile] = File(...),
dpi: int = 1200,
pages: str = ""
):
"""
Batch file upload - process multiple files concurrently.
Returns results for all files.
"""
batch_id = str(uuid.uuid4())
logger.info(f"[BATCH {batch_id}] Received {len(files)} files")
results = []
# Process files with semaphore control
async def process_single(file: UploadFile, index: int):
async with processing_semaphore:
try:
content = await file.read()
file_size = len(content)
if file_size > MAX_FILE_SIZE:
return {
"filename": file.filename,
"index": index,
"success": False,
"error": "File too large",
"file_size": file_size
}
logger.info(f"[BATCH {batch_id}] [{index+1}/{len(files)}] Processing: {file.filename}")
result = await process_file_with_timeout(content, file.filename, dpi, pages, True)
result["index"] = index
result["file_size"] = file_size
return result
except Exception as e:
logger.error(f"[BATCH {batch_id}] Error processing {file.filename}: {e}")
return {
"filename": file.filename,
"index": index,
"success": False,
"error": str(e)
}
# Process all files concurrently (limited by semaphore)
tasks = [process_single(file, i) for i, file in enumerate(files)]
results = await asyncio.gather(*tasks)
# Summary
successful = sum(1 for r in results if r.get("success"))
failed = len(results) - successful
logger.info(f"[BATCH {batch_id}] Completed: {successful} successful, {failed} failed")
return {
"batch_id": batch_id,
"total_files": len(files),
"successful": successful,
"failed": failed,
"results": results
}
@app.get("/tracking/{request_id}")
def get_tracking(request_id: str):
"""Get tracking information for a request."""
if request_id not in request_tracking:
raise HTTPException(status_code=404, detail="Request not found")
return request_tracking[request_id]
@app.get("/stats")
def get_stats():
"""Get current processing statistics."""
cleanup_old_tracking()
total_requests = len(request_tracking)
completed = sum(1 for r in request_tracking.values() if r["status"] == "completed")
failed = sum(1 for r in request_tracking.values() if r["status"] == "failed")
processing = sum(1 for r in request_tracking.values() if r["status"] == "processing")
return {
"current_load": {
"processing": processing,
"available_slots": MAX_CONCURRENT_REQUESTS - processing,
"max_concurrent": MAX_CONCURRENT_REQUESTS
},
"statistics": {
"total_requests": total_requests,
"completed": completed,
"failed": failed,
"no_qr_found": sum(1 for r in request_tracking.values() if r["status"] == "no_qr_found")
},
"configuration": {
"max_file_size": f"{MAX_FILE_SIZE / (1024*1024)}MB",
"processing_timeout": f"{PROCESSING_TIMEOUT}s",
"thumbnail_dpi": THUMBNAIL_DPI,
"max_concurrent": MAX_CONCURRENT_REQUESTS
}
}
@app.get("/health")
def health_check():
"""Health check endpoint."""
processing = sum(1 for r in request_tracking.values() if r["status"] == "processing")
return {
"status": "healthy",
"processing": processing,
"available_slots": MAX_CONCURRENT_REQUESTS - processing
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info", workers=1)