Spaces:
No application file
No application file
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import cv2 | |
| import numpy as np | |
| from sqlalchemy import create_engine, Column, Integer, String, DateTime, LargeBinary | |
| from sqlalchemy.orm import sessionmaker, declarative_base | |
| from datetime import datetime | |
| import os | |
| import uvicorn | |
| # Initialize FastAPI app properly for Hugging Face Spaces | |
| app = FastAPI( | |
| title="QR Code Monitoring & Compliance System", | |
| description="Real-time QR code monitoring and compliance system", | |
| version="1.0" | |
| ) | |
| # Database Setup (using SQLite by default for Hugging Face Spaces) | |
| DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./qrmonitoring.db") | |
| Base = declarative_base() | |
| class Whitelist(Base): | |
| __tablename__ = 'whitelist' | |
| id = Column(Integer, primary_key=True) | |
| upi_id = Column(String, unique=True) | |
| merchant_id = Column(String) | |
| added_at = Column(DateTime, default=datetime.utcnow) | |
| class EventLog(Base): | |
| __tablename__ = 'event_logs' | |
| id = Column(Integer, primary_key=True) | |
| timestamp = Column(DateTime, default=datetime.utcnow) | |
| qr_data = Column(String) | |
| threat_type = Column(String) | |
| threat_severity = Column(String) | |
| image_data = Column(LargeBinary) | |
| status = Column(String, default="pending") | |
| # Initialize database | |
| engine = create_engine(DATABASE_URL) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| # Create tables if they don't exist | |
| Base.metadata.create_all(bind=engine) | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| # QR Detection using OpenCV | |
| def detect_qrs(image_data: bytes): | |
| """Detect QR codes in an image using OpenCV.""" | |
| try: | |
| nparr = np.frombuffer(image_data, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| detector = cv2.QRCodeDetector() | |
| data, vertices, _ = detector.detectAndDecode(image) | |
| qrs = [] | |
| if vertices is not None: | |
| vertices = vertices[0].astype(int) | |
| x_coords = vertices[:, 0] | |
| y_coords = vertices[:, 1] | |
| x1, x2 = min(x_coords), max(x_coords) | |
| y1, y2 = min(y_coords), max(y_coords) | |
| qrs.append({ | |
| "data": data, | |
| "location": {"x1": x1, "y1": y1, "x2": x2, "y2": y2} | |
| }) | |
| return qrs | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # QR Validation | |
| def validate_qr(qr_data: str, db): | |
| """Check if QR code data is in the whitelist.""" | |
| if not qr_data: | |
| return False, "Empty QR data" | |
| qr_entry = db.query(Whitelist).filter(Whitelist.upi_id == qr_data).first() | |
| return bool(qr_entry), "Unauthorized QR" if not qr_entry else "Authorized" | |
| # Threat Detection | |
| def check_threats(image_data: bytes, qrs: list, db): | |
| """Check for threats like multiple QRs or unauthorized QRs.""" | |
| threats = [] | |
| try: | |
| if len(qrs) > 1: | |
| threats.append({ | |
| "type": "multiple_qrs", | |
| "severity": "high", | |
| "message": "Multiple QRs detected in frame" | |
| }) | |
| for qr in qrs: | |
| qr_data = qr.get("data", "") | |
| is_valid, reason = validate_qr(qr_data, db) | |
| if not is_valid: | |
| threats.append({ | |
| "type": "unauthorized_qr", | |
| "severity": "high", | |
| "message": f"Unauthorized QR detected: {reason}" | |
| }) | |
| if not threats: | |
| threats.append({ | |
| "type": "no_threats", | |
| "severity": "info", | |
| "message": "No threats detected" | |
| }) | |
| return threats | |
| except Exception as e: | |
| print(f"Error in threat detection: {e}") | |
| return [{ | |
| "type": "error", | |
| "severity": "critical", | |
| "message": f"Error in threat detection: {str(e)}" | |
| }] | |
| # Alert Dispatcher | |
| def send_alert(threat: dict, image_data: bytes): | |
| """Simulate sending an alert.""" | |
| timestamp = datetime.utcnow().isoformat() | |
| print(f"ALERT: {threat['message']} at {timestamp}") | |
| return { | |
| "status": "Alert sent", | |
| "threat": threat, | |
| "timestamp": timestamp | |
| } | |
| # Event Logging | |
| def log_event(db, image_data: bytes, qr_data: str, threat: dict): | |
| """Log event details to database.""" | |
| try: | |
| nparr = np.frombuffer(image_data, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| _, img_encoded = cv2.imencode('.jpg', img) | |
| img_bytes = img_encoded.tobytes() | |
| event = EventLog( | |
| qr_data=qr_data, | |
| threat_type=threat["type"], | |
| threat_severity=threat["severity"], | |
| threat_message=threat["message"], | |
| image_data=img_bytes | |
| ) | |
| db.add(event) | |
| db.commit() | |
| return True | |
| except Exception as e: | |
| db.rollback() | |
| print(f"Error logging event: {e}") | |
| return False | |
| # API Endpoints | |
| async def process_frame(file: UploadFile = File(...)): | |
| """Process a video frame to detect and validate QR codes.""" | |
| db = next(get_db()) | |
| try: | |
| image_data = await file.read() | |
| qrs = detect_qrs(image_data) | |
| if "error" in qrs: | |
| raise HTTPException(status_code=400, detail=qrs["error"]) | |
| threats = check_threats(image_data, qrs, db) | |
| results = [] | |
| for qr in qrs: | |
| qr_data = qr.get("data", "") | |
| threat = next( | |
| (t for t in threats if "unauthorized" in t["type"] or "multiple" in t["type"]), | |
| threats[-1] | |
| ) | |
| if threat["type"] not in ["no_threats", "info"]: | |
| send_alert(threat, image_data) | |
| log_event(db, image_data, qr_data, threat) | |
| results.append({ | |
| "qr_data": qr_data, | |
| "threat": threat | |
| }) | |
| return { | |
| "status": "processed", | |
| "qr_count": len(qrs), | |
| "results": results, | |
| "threats": threats | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| db.close() | |
| async def get_alerts(limit: int = 100): | |
| """Fetch logged alerts for the dashboard.""" | |
| db = next(get_db()) | |
| try: | |
| alerts = db.query(EventLog)\ | |
| .order_by(EventLog.timestamp.desc())\ | |
| .limit(limit)\ | |
| .all() | |
| return [ | |
| { | |
| "timestamp": alert.timestamp.isoformat(), | |
| "threat_type": alert.threat_type, | |
| "severity": alert.threat_severity, | |
| "message": alert.threat_message, | |
| "qr_data": alert.qr_data, | |
| "image_available": bool(alert.image_data) | |
| } | |
| for alert in alerts | |
| ] | |
| finally: | |
| db.close() | |
| async def add_to_whitelist(upi_id: str, merchant_id: str = None): | |
| """Add a new QR code to the whitelist.""" | |
| db = next(get_db()) | |
| try: | |
| if not upi_id or len(upi_id) < 5: | |
| raise HTTPException(status_code=400, detail="Invalid UPI ID") | |
| existing = db.query(Whitelist).filter(Whitelist.upi_id == upi_id).first() | |
| if existing: | |
| raise HTTPException(status_code=400, detail="UPI ID already exists") | |
| new_entry = Whitelist(upi_id=upi_id, merchant_id=merchant_id) | |
| db.add(new_entry) | |
| db.commit() | |
| return { | |
| "status": "success", | |
| "upi_id": upi_id, | |
| "merchant_id": merchant_id | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| db.close() | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| try: | |
| db = next(get_db()) | |
| db.execute("SELECT 1") | |
| return { | |
| "status": "healthy", | |
| "database": "connected", | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "degraded", | |
| "database": "disconnected", | |
| "error": str(e), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| finally: | |
| db.close() | |
| # Required for Hugging Face Spaces deployment | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8080) |