Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, validator | |
| from slowapi import Limiter | |
| from slowapi.util import get_remote_address | |
| from slowapi.errors import RateLimitExceeded | |
| from fastapi.responses import JSONResponse | |
| import os | |
| import re | |
| import hmac | |
| import hashlib | |
| import base64 | |
| import json | |
| from blacklist_layer_mobile import BlacklistLayer | |
| from ai_layer_mobile import AIAnalyzer | |
| app = FastAPI(title="QR Guard API", version="1.0") | |
| # إعداد معدل التقييد (Rate Limiting) لمنع هجمات DoS | |
| limiter = Limiter(key_func=get_remote_address) | |
| app.state.limiter = limiter | |
| # التعامل مع استثناء تجاوز حد الطلبات | |
| async def rate_limit_handler(request: Request, exc: RateLimitExceeded): | |
| return JSONResponse( | |
| status_code=429, | |
| content={"detail": "Rate limit exceeded. Please slow down your requests."}, | |
| ) | |
| # تهيئة سياسة CORS: السماح فقط لنطاق محدد - استبدل بالدامين الخاص بالواجهة الأمامية | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["https://your-frontend-domain.com"], # غيّر هذا للنطاق المرغوب فقط | |
| allow_methods=["GET", "POST"], | |
| allow_headers=["*"], | |
| ) | |
| print("[INIT] QR Guard Server Starting...") | |
| # قراءة مفاتيح الأمان من متغيرات البيئة | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") | |
| SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key") # مفتاح سري للتوقيع الرقمي، يجب تأمينه | |
| blacklist = BlacklistLayer(api_key=GOOGLE_API_KEY) | |
| print("[OK] Blacklist Layer Ready") | |
| ai = AIAnalyzer(model_path="/app") | |
| print("[OK] AI Analyzer Ready") | |
| print("[READY] Server is running!") | |
| # موديل الإدخال مع تحقق URL متقدم | |
| class URLRequest(BaseModel): | |
| url: str | |
| def validate_url(cls, v): | |
| if not v or not v.strip(): | |
| raise ValueError("URL cannot be empty") | |
| if len(v) > 2048: | |
| raise ValueError("URL exceeds maximum allowed length of 2048 characters") | |
| if not re.match(r'^https?://', v, re.IGNORECASE): | |
| raise ValueError("URL must begin with http:// or https://") | |
| blocked_patterns = [ | |
| r'localhost', r'127\.0\.0\.1', r'192\.168\.', | |
| r'10\.0\.', r'172\.(1[6-9]|2[0-9]|3[0-1])\.' | |
| ] | |
| for pattern in blocked_patterns: | |
| if re.search(pattern, v, re.IGNORECASE): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Private or local network URLs are not permitted" | |
| ) | |
| return v.strip() | |
| def sign_response(payload: str) -> str: | |
| """ | |
| إنشاء توقيع رقمي HMAC-SHA256 على الرسالة لضمان سلامة البيانات | |
| """ | |
| signature = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).digest() | |
| return base64.b64encode(signature).decode() | |
| def root(): | |
| return { | |
| "status": "QR Guard API is running", | |
| "version": "1.0", | |
| "layers": ["Google Safe Browsing", "AI Analysis"] | |
| } | |
| # تقييد الطلبات: 10 طلبات للدقيقة لكل IP | |
| def scan(req: URLRequest, request: Request): | |
| url = req.url | |
| print(f"[SCAN] {url}") | |
| # بعد اجتياز التحقق من صحة URL، استدعاء طبقة القائمة السوداء | |
| is_bad, msg = blacklist.check_url(url) | |
| print(f"[Layer 1] Blacklist: {msg}") | |
| # استدعاء طبقة الذكاء الاصطناعي مع fallback في حالة الخطأ | |
| try: | |
| ai_result = ai.analyze_url(url) | |
| print(f"[Layer 2] AI: {ai_result}") | |
| except Exception as e: | |
| print(f"[WARN] AI error: {e}") | |
| ai_result = "SAFE" # fallback للسلامة | |
| safe = (not is_bad) and (ai_result == "SAFE") | |
| # تكوين النتيجة النهائية | |
| result = { | |
| "url": url, | |
| "safe": safe, | |
| "google_status": "Threat" if is_bad else "Clean", | |
| "ai_status": "High Risk" if ai_result == "DANGEROUS" else "Safe", | |
| } | |
| # إنشاء التوقيع الرقمي للنتيجة لضمان عدم التلاعب من العميل | |
| result_json = json.dumps(result, sort_keys=True) | |
| signature = sign_response(result_json) | |
| response_body = { | |
| "result": result, | |
| "signature": signature, | |
| } | |
| print(f"[RESULT] {response_body}") | |
| return response_body |