Spaces:
Running
Running
File size: 4,626 Bytes
b2513d8 93bcdb2 01b6081 b2513d8 93bcdb2 24dc042 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b4d4de1 93bcdb2 b2513d8 93bcdb2 01b6081 24dc042 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 93bcdb2 b2513d8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, validator
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse
import os
import re
import hmac
import hashlib
import base64
import json
from blacklist_layer_mobile import BlacklistLayer
from ai_layer_mobile import AIAnalyzer
app = FastAPI(title="QR Guard API", version="1.0")
# إعداد معدل التقييد (Rate Limiting) لمنع هجمات DoS
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# التعامل مع استثناء تجاوز حد الطلبات
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Please slow down your requests."},
)
# تهيئة سياسة CORS: السماح فقط لنطاق محدد - استبدل بالدامين الخاص بالواجهة الأمامية
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-frontend-domain.com"], # غيّر هذا للنطاق المرغوب فقط
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
print("[INIT] QR Guard Server Starting...")
# قراءة مفاتيح الأمان من متغيرات البيئة
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key") # مفتاح سري للتوقيع الرقمي، يجب تأمينه
blacklist = BlacklistLayer(api_key=GOOGLE_API_KEY)
print("[OK] Blacklist Layer Ready")
ai = AIAnalyzer(model_path="/app")
print("[OK] AI Analyzer Ready")
print("[READY] Server is running!")
# موديل الإدخال مع تحقق URL متقدم
class URLRequest(BaseModel):
url: str
@validator('url')
def validate_url(cls, v):
if not v or not v.strip():
raise ValueError("URL cannot be empty")
if len(v) > 2048:
raise ValueError("URL exceeds maximum allowed length of 2048 characters")
if not re.match(r'^https?://', v, re.IGNORECASE):
raise ValueError("URL must begin with http:// or https://")
blocked_patterns = [
r'localhost', r'127\.0\.0\.1', r'192\.168\.',
r'10\.0\.', r'172\.(1[6-9]|2[0-9]|3[0-1])\.'
]
for pattern in blocked_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise HTTPException(
status_code=400,
detail="Private or local network URLs are not permitted"
)
return v.strip()
def sign_response(payload: str) -> str:
"""
إنشاء توقيع رقمي HMAC-SHA256 على الرسالة لضمان سلامة البيانات
"""
signature = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).digest()
return base64.b64encode(signature).decode()
@app.get("/")
def root():
return {
"status": "QR Guard API is running",
"version": "1.0",
"layers": ["Google Safe Browsing", "AI Analysis"]
}
@app.post("/scan")
@limiter.limit("10/minute") # تقييد الطلبات: 10 طلبات للدقيقة لكل IP
def scan(req: URLRequest, request: Request):
url = req.url
print(f"[SCAN] {url}")
# بعد اجتياز التحقق من صحة URL، استدعاء طبقة القائمة السوداء
is_bad, msg = blacklist.check_url(url)
print(f"[Layer 1] Blacklist: {msg}")
# استدعاء طبقة الذكاء الاصطناعي مع fallback في حالة الخطأ
try:
ai_result = ai.analyze_url(url)
print(f"[Layer 2] AI: {ai_result}")
except Exception as e:
print(f"[WARN] AI error: {e}")
ai_result = "SAFE" # fallback للسلامة
safe = (not is_bad) and (ai_result == "SAFE")
# تكوين النتيجة النهائية
result = {
"url": url,
"safe": safe,
"google_status": "Threat" if is_bad else "Clean",
"ai_status": "High Risk" if ai_result == "DANGEROUS" else "Safe",
}
# إنشاء التوقيع الرقمي للنتيجة لضمان عدم التلاعب من العميل
result_json = json.dumps(result, sort_keys=True)
signature = sign_response(result_json)
response_body = {
"result": result,
"signature": signature,
}
print(f"[RESULT] {response_body}")
return response_body |