File size: 2,459 Bytes
16b2195
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74cacc0
16b2195
 
 
74cacc0
16b2195
 
 
 
 
 
 
 
74cacc0
16b2195
 
 
 
 
 
 
 
 
74cacc0
16b2195
 
 
 
 
 
 
 
74cacc0
16b2195
 
 
 
 
 
 
 
 
 
74cacc0
16b2195
74cacc0
16b2195
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""Bearer token authentication and URL validation (SSRF protection)."""

import ipaddress
import secrets
import socket
from urllib.parse import urlparse

from fastapi import Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

from config import API_TOKEN, BLOCKED_HOSTNAMES

security = HTTPBearer()


def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
    """Verify the API token from Authorization header."""
    if not API_TOKEN:
        raise HTTPException(status_code=500, detail="No API token configured on server")

    token = credentials.credentials
    if not secrets.compare_digest(token, API_TOKEN):
        raise HTTPException(status_code=401, detail="Invalid API token")
    return token


def _validate_url(url: str) -> None:
    """Validate URL to prevent SSRF attacks."""
    try:
        parsed = urlparse(url)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid URL format: {str(e)}")

    if parsed.scheme not in ("http", "https"):
        raise HTTPException(
            status_code=400,
            detail=f"Invalid URL scheme '{parsed.scheme}'. Only http and https are allowed.",
        )

    hostname = parsed.hostname
    if not hostname:
        raise HTTPException(status_code=400, detail="Invalid URL: missing hostname.")

    hostname_lower = hostname.lower()
    if hostname_lower in BLOCKED_HOSTNAMES:
        raise HTTPException(
            status_code=400,
            detail="Access to internal/metadata services is not allowed.",
        )

    for pattern in ("metadata", "internal", "localhost", "127.0.0.1", "::1"):
        if pattern in hostname_lower:
            raise HTTPException(
                status_code=400,
                detail="Access to internal/metadata services is not allowed.",
            )

    try:
        ip_str = socket.gethostbyname(hostname)
        ip = ipaddress.ip_address(ip_str)
    except socket.gaierror:
        raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {hostname}")
    except ValueError as e:
        raise HTTPException(status_code=400, detail=f"Invalid IP address resolved: {str(e)}")

    if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved or ip.is_multicast:
        raise HTTPException(
            status_code=400,
            detail="Access to private/internal IP addresses is not allowed.",
        )