from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, validator from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from fastapi.responses import JSONResponse import os import re import hmac import hashlib import base64 import json from blacklist_layer_mobile import BlacklistLayer from ai_layer_mobile import AIAnalyzer app = FastAPI(title="QR Guard API", version="1.0") # إعداد معدل التقييد (Rate Limiting) لمنع هجمات DoS limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter # التعامل مع استثناء تجاوز حد الطلبات @app.exception_handler(RateLimitExceeded) async def rate_limit_handler(request: Request, exc: RateLimitExceeded): return JSONResponse( status_code=429, content={"detail": "Rate limit exceeded. Please slow down your requests."}, ) # تهيئة سياسة CORS: السماح فقط لنطاق محدد - استبدل بالدامين الخاص بالواجهة الأمامية app.add_middleware( CORSMiddleware, allow_origins=["https://your-frontend-domain.com"], # غيّر هذا للنطاق المرغوب فقط allow_methods=["GET", "POST"], allow_headers=["*"], ) print("[INIT] QR Guard Server Starting...") # قراءة مفاتيح الأمان من متغيرات البيئة GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key") # مفتاح سري للتوقيع الرقمي، يجب تأمينه blacklist = BlacklistLayer(api_key=GOOGLE_API_KEY) print("[OK] Blacklist Layer Ready") ai = AIAnalyzer(model_path="/app") print("[OK] AI Analyzer Ready") print("[READY] Server is running!") # موديل الإدخال مع تحقق URL متقدم class URLRequest(BaseModel): url: str @validator('url') def validate_url(cls, v): if not v or not v.strip(): raise ValueError("URL cannot be empty") if len(v) > 2048: raise ValueError("URL exceeds maximum allowed length of 2048 characters") if not re.match(r'^https?://', v, re.IGNORECASE): raise ValueError("URL must begin with http:// or https://") blocked_patterns = [ r'localhost', r'127\.0\.0\.1', r'192\.168\.', r'10\.0\.', r'172\.(1[6-9]|2[0-9]|3[0-1])\.' ] for pattern in blocked_patterns: if re.search(pattern, v, re.IGNORECASE): raise HTTPException( status_code=400, detail="Private or local network URLs are not permitted" ) return v.strip() def sign_response(payload: str) -> str: """ إنشاء توقيع رقمي HMAC-SHA256 على الرسالة لضمان سلامة البيانات """ signature = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).digest() return base64.b64encode(signature).decode() @app.get("/") def root(): return { "status": "QR Guard API is running", "version": "1.0", "layers": ["Google Safe Browsing", "AI Analysis"] } @app.post("/scan") @limiter.limit("10/minute") # تقييد الطلبات: 10 طلبات للدقيقة لكل IP def scan(req: URLRequest, request: Request): url = req.url print(f"[SCAN] {url}") # بعد اجتياز التحقق من صحة URL، استدعاء طبقة القائمة السوداء is_bad, msg = blacklist.check_url(url) print(f"[Layer 1] Blacklist: {msg}") # استدعاء طبقة الذكاء الاصطناعي مع fallback في حالة الخطأ try: ai_result = ai.analyze_url(url) print(f"[Layer 2] AI: {ai_result}") except Exception as e: print(f"[WARN] AI error: {e}") ai_result = "SAFE" # fallback للسلامة safe = (not is_bad) and (ai_result == "SAFE") # تكوين النتيجة النهائية result = { "url": url, "safe": safe, "google_status": "Threat" if is_bad else "Clean", "ai_status": "High Risk" if ai_result == "DANGEROUS" else "Safe", } # إنشاء التوقيع الرقمي للنتيجة لضمان عدم التلاعب من العميل result_json = json.dumps(result, sort_keys=True) signature = sign_response(result_json) response_body = { "result": result, "signature": signature, } print(f"[RESULT] {response_body}") return response_body