File size: 4,626 Bytes
b2513d8
93bcdb2
01b6081
b2513d8
 
 
 
 
93bcdb2
24dc042
b2513d8
 
 
 
93bcdb2
 
 
 
 
 
b2513d8
 
 
 
 
 
 
 
 
 
 
 
 
93bcdb2
 
b2513d8
 
93bcdb2
 
 
 
 
b2513d8
93bcdb2
b2513d8
 
93bcdb2
 
 
b4d4de1
93bcdb2
 
 
 
 
b2513d8
93bcdb2
 
 
01b6081
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24dc042
 
b2513d8
 
 
 
 
 
 
 
93bcdb2
 
 
 
 
 
 
 
 
 
b2513d8
 
93bcdb2
 
 
b2513d8
93bcdb2
 
 
b2513d8
93bcdb2
 
 
 
 
b2513d8
93bcdb2
 
 
b2513d8
93bcdb2
 
 
 
 
 
b2513d8
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, validator
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse

import os
import re
import hmac
import hashlib
import base64
import json

from blacklist_layer_mobile import BlacklistLayer
from ai_layer_mobile import AIAnalyzer

app = FastAPI(title="QR Guard API", version="1.0")

# إعداد معدل التقييد (Rate Limiting) لمنع هجمات DoS
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

# التعامل مع استثناء تجاوز حد الطلبات
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    return JSONResponse(
        status_code=429,
        content={"detail": "Rate limit exceeded. Please slow down your requests."},
    )

# تهيئة سياسة CORS: السماح فقط لنطاق محدد - استبدل بالدامين الخاص بالواجهة الأمامية
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://your-frontend-domain.com"],  # غيّر هذا للنطاق المرغوب فقط
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

print("[INIT] QR Guard Server Starting...")

# قراءة مفاتيح الأمان من متغيرات البيئة
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")  # مفتاح سري للتوقيع الرقمي، يجب تأمينه

blacklist = BlacklistLayer(api_key=GOOGLE_API_KEY)
print("[OK] Blacklist Layer Ready")

ai = AIAnalyzer(model_path="/app")
print("[OK] AI Analyzer Ready")

print("[READY] Server is running!")


# موديل الإدخال مع تحقق URL متقدم
class URLRequest(BaseModel):
    url: str

    @validator('url')
    def validate_url(cls, v):
        if not v or not v.strip():
            raise ValueError("URL cannot be empty")

        if len(v) > 2048:
            raise ValueError("URL exceeds maximum allowed length of 2048 characters")

        if not re.match(r'^https?://', v, re.IGNORECASE):
            raise ValueError("URL must begin with http:// or https://")

        blocked_patterns = [
            r'localhost', r'127\.0\.0\.1', r'192\.168\.',
            r'10\.0\.', r'172\.(1[6-9]|2[0-9]|3[0-1])\.'
        ]
        for pattern in blocked_patterns:
            if re.search(pattern, v, re.IGNORECASE):
                raise HTTPException(
                    status_code=400,
                    detail="Private or local network URLs are not permitted"
                )
        return v.strip()


def sign_response(payload: str) -> str:
    """
    إنشاء توقيع رقمي HMAC-SHA256 على الرسالة لضمان سلامة البيانات
    """
    signature = hmac.new(SECRET_KEY.encode(), payload.encode(), hashlib.sha256).digest()
    return base64.b64encode(signature).decode()


@app.get("/")
def root():
    return {
        "status": "QR Guard API is running",
        "version": "1.0",
        "layers": ["Google Safe Browsing", "AI Analysis"]
    }


@app.post("/scan")
@limiter.limit("10/minute")  # تقييد الطلبات: 10 طلبات للدقيقة لكل IP
def scan(req: URLRequest, request: Request):
    url = req.url
    print(f"[SCAN] {url}")

    # بعد اجتياز التحقق من صحة URL، استدعاء طبقة القائمة السوداء
    is_bad, msg = blacklist.check_url(url)
    print(f"[Layer 1] Blacklist: {msg}")

    # استدعاء طبقة الذكاء الاصطناعي مع fallback في حالة الخطأ
    try:
        ai_result = ai.analyze_url(url)
        print(f"[Layer 2] AI: {ai_result}")
    except Exception as e:
        print(f"[WARN] AI error: {e}")
        ai_result = "SAFE"  # fallback للسلامة

    safe = (not is_bad) and (ai_result == "SAFE")

    # تكوين النتيجة النهائية
    result = {
        "url": url,
        "safe": safe,
        "google_status": "Threat" if is_bad else "Clean",
        "ai_status": "High Risk" if ai_result == "DANGEROUS" else "Safe",
    }

    # إنشاء التوقيع الرقمي للنتيجة لضمان عدم التلاعب من العميل
    result_json = json.dumps(result, sort_keys=True)
    signature = sign_response(result_json)

    response_body = {
        "result": result,
        "signature": signature,
    }

    print(f"[RESULT] {response_body}")

    return response_body