qr-guard-api / server.py
norah-sudo's picture
Update server.py
b2513d8 verified
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, validator
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse
import os
import re
import hmac
import hashlib
import base64
import json
from blacklist_layer_mobile import BlacklistLayer
from ai_layer_mobile import AIAnalyzer
app = FastAPI(title="QR Guard API", version="1.0")
# إعداد معدل التقييد (Rate Limiting) لمنع هجمات DoS
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# التعامل مع استثناء تجاوز حد الطلبات
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Please slow down your requests."},
)
# تهيئة سياسة CORS: السماح فقط لنطاق محدد - استبدل بالدامين الخاص بالواجهة الأمامية
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-frontend-domain.com"], # غيّر هذا للنطاق المرغوب فقط
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
print("[INIT] QR Guard Server Starting...")
# قراءة مفاتيح الأمان من متغيرات البيئة
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key") # مفتاح سري للتوقيع الرقمي، يجب تأمينه
blacklist = BlacklistLayer(api_key=GOOGLE_API_KEY)
print("[OK] Blacklist Layer Ready")
ai = AIAnalyzer(model_path="/app")
print("[OK] AI Analyzer Ready")
print("[READY] Server is running!")
# موديل الإدخال مع تحقق URL متقدم
class URLRequest(BaseModel):
url: str
@validator('url')
def validate_url(cls, v):
if not v or not v.strip():
raise ValueError("URL cannot be empty")
if len(v) > 2048:
raise ValueError("URL exceeds maximum allowed length of 2048 characters")
if not re.match(r'^https?://', v, re.IGNORECASE):
raise ValueError("URL must begin with http:// or https://")
blocked_patterns = [
r'localhost', r'127\.0\.0\.1', r'192\.168\.',
r'10\.0\.', r'172\.(1[6-9]|2[0-9]|3[0-1])\.'
]
for pattern in blocked_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise HTTPException(
status_code=400,
detail="Private or local network URLs are not permitted"
)
return v.strip()
def sign_response(payload: str) -> str:
"""
إنشاء توقيع رقمي HMAC-SHA256 على الرسالة لضمان سلامة البيانات
"""
signature = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).digest()
return base64.b64encode(signature).decode()
@app.get("/")
def root():
return {
"status": "QR Guard API is running",
"version": "1.0",
"layers": ["Google Safe Browsing", "AI Analysis"]
}
@app.post("/scan")
@limiter.limit("10/minute") # تقييد الطلبات: 10 طلبات للدقيقة لكل IP
def scan(req: URLRequest, request: Request):
url = req.url
print(f"[SCAN] {url}")
# بعد اجتياز التحقق من صحة URL، استدعاء طبقة القائمة السوداء
is_bad, msg = blacklist.check_url(url)
print(f"[Layer 1] Blacklist: {msg}")
# استدعاء طبقة الذكاء الاصطناعي مع fallback في حالة الخطأ
try:
ai_result = ai.analyze_url(url)
print(f"[Layer 2] AI: {ai_result}")
except Exception as e:
print(f"[WARN] AI error: {e}")
ai_result = "SAFE" # fallback للسلامة
safe = (not is_bad) and (ai_result == "SAFE")
# تكوين النتيجة النهائية
result = {
"url": url,
"safe": safe,
"google_status": "Threat" if is_bad else "Clean",
"ai_status": "High Risk" if ai_result == "DANGEROUS" else "Safe",
}
# إنشاء التوقيع الرقمي للنتيجة لضمان عدم التلاعب من العميل
result_json = json.dumps(result, sort_keys=True)
signature = sign_response(result_json)
response_body = {
"result": result,
"signature": signature,
}
print(f"[RESULT] {response_body}")
return response_body