QR-DETECTOR1 / app.py
Sivainti's picture
Create app.py
2db93d1 verified
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import cv2
import numpy as np
from sqlalchemy import create_engine, Column, Integer, String, DateTime, LargeBinary
from sqlalchemy.orm import sessionmaker, declarative_base
from datetime import datetime
import os
import uvicorn
# Initialize FastAPI app properly for Hugging Face Spaces
app = FastAPI(
title="QR Code Monitoring & Compliance System",
description="Real-time QR code monitoring and compliance system",
version="1.0"
)
# Database Setup (using SQLite by default for Hugging Face Spaces)
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./qrmonitoring.db")
Base = declarative_base()
class Whitelist(Base):
__tablename__ = 'whitelist'
id = Column(Integer, primary_key=True)
upi_id = Column(String, unique=True)
merchant_id = Column(String)
added_at = Column(DateTime, default=datetime.utcnow)
class EventLog(Base):
__tablename__ = 'event_logs'
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.utcnow)
qr_data = Column(String)
threat_type = Column(String)
threat_severity = Column(String)
image_data = Column(LargeBinary)
status = Column(String, default="pending")
# Initialize database
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create tables if they don't exist
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# QR Detection using OpenCV
def detect_qrs(image_data: bytes):
"""Detect QR codes in an image using OpenCV."""
try:
nparr = np.frombuffer(image_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
detector = cv2.QRCodeDetector()
data, vertices, _ = detector.detectAndDecode(image)
qrs = []
if vertices is not None:
vertices = vertices[0].astype(int)
x_coords = vertices[:, 0]
y_coords = vertices[:, 1]
x1, x2 = min(x_coords), max(x_coords)
y1, y2 = min(y_coords), max(y_coords)
qrs.append({
"data": data,
"location": {"x1": x1, "y1": y1, "x2": x2, "y2": y2}
})
return qrs
except Exception as e:
return {"error": str(e)}
# QR Validation
def validate_qr(qr_data: str, db):
"""Check if QR code data is in the whitelist."""
if not qr_data:
return False, "Empty QR data"
qr_entry = db.query(Whitelist).filter(Whitelist.upi_id == qr_data).first()
return bool(qr_entry), "Unauthorized QR" if not qr_entry else "Authorized"
# Threat Detection
def check_threats(image_data: bytes, qrs: list, db):
"""Check for threats like multiple QRs or unauthorized QRs."""
threats = []
try:
if len(qrs) > 1:
threats.append({
"type": "multiple_qrs",
"severity": "high",
"message": "Multiple QRs detected in frame"
})
for qr in qrs:
qr_data = qr.get("data", "")
is_valid, reason = validate_qr(qr_data, db)
if not is_valid:
threats.append({
"type": "unauthorized_qr",
"severity": "high",
"message": f"Unauthorized QR detected: {reason}"
})
if not threats:
threats.append({
"type": "no_threats",
"severity": "info",
"message": "No threats detected"
})
return threats
except Exception as e:
print(f"Error in threat detection: {e}")
return [{
"type": "error",
"severity": "critical",
"message": f"Error in threat detection: {str(e)}"
}]
# Alert Dispatcher
def send_alert(threat: dict, image_data: bytes):
"""Simulate sending an alert."""
timestamp = datetime.utcnow().isoformat()
print(f"ALERT: {threat['message']} at {timestamp}")
return {
"status": "Alert sent",
"threat": threat,
"timestamp": timestamp
}
# Event Logging
def log_event(db, image_data: bytes, qr_data: str, threat: dict):
"""Log event details to database."""
try:
nparr = np.frombuffer(image_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
_, img_encoded = cv2.imencode('.jpg', img)
img_bytes = img_encoded.tobytes()
event = EventLog(
qr_data=qr_data,
threat_type=threat["type"],
threat_severity=threat["severity"],
threat_message=threat["message"],
image_data=img_bytes
)
db.add(event)
db.commit()
return True
except Exception as e:
db.rollback()
print(f"Error logging event: {e}")
return False
# API Endpoints
@app.post("/process_frame")
async def process_frame(file: UploadFile = File(...)):
"""Process a video frame to detect and validate QR codes."""
db = next(get_db())
try:
image_data = await file.read()
qrs = detect_qrs(image_data)
if "error" in qrs:
raise HTTPException(status_code=400, detail=qrs["error"])
threats = check_threats(image_data, qrs, db)
results = []
for qr in qrs:
qr_data = qr.get("data", "")
threat = next(
(t for t in threats if "unauthorized" in t["type"] or "multiple" in t["type"]),
threats[-1]
)
if threat["type"] not in ["no_threats", "info"]:
send_alert(threat, image_data)
log_event(db, image_data, qr_data, threat)
results.append({
"qr_data": qr_data,
"threat": threat
})
return {
"status": "processed",
"qr_count": len(qrs),
"results": results,
"threats": threats
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@app.get("/alerts")
async def get_alerts(limit: int = 100):
"""Fetch logged alerts for the dashboard."""
db = next(get_db())
try:
alerts = db.query(EventLog)\
.order_by(EventLog.timestamp.desc())\
.limit(limit)\
.all()
return [
{
"timestamp": alert.timestamp.isoformat(),
"threat_type": alert.threat_type,
"severity": alert.threat_severity,
"message": alert.threat_message,
"qr_data": alert.qr_data,
"image_available": bool(alert.image_data)
}
for alert in alerts
]
finally:
db.close()
@app.post("/whitelist")
async def add_to_whitelist(upi_id: str, merchant_id: str = None):
"""Add a new QR code to the whitelist."""
db = next(get_db())
try:
if not upi_id or len(upi_id) < 5:
raise HTTPException(status_code=400, detail="Invalid UPI ID")
existing = db.query(Whitelist).filter(Whitelist.upi_id == upi_id).first()
if existing:
raise HTTPException(status_code=400, detail="UPI ID already exists")
new_entry = Whitelist(upi_id=upi_id, merchant_id=merchant_id)
db.add(new_entry)
db.commit()
return {
"status": "success",
"upi_id": upi_id,
"merchant_id": merchant_id
}
except HTTPException:
raise
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=str(e))
finally:
db.close()
@app.get("/health")
async def health_check():
"""Health check endpoint."""
try:
db = next(get_db())
db.execute("SELECT 1")
return {
"status": "healthy",
"database": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"status": "degraded",
"database": "disconnected",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
finally:
db.close()
# Required for Hugging Face Spaces deployment
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)