File size: 2,142 Bytes
afd56bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import jwt
from jwt import PyJWKClient
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from core.subscription.checker import SubscriptionChecker

security = HTTPBearer(auto_error=False)


def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Middleware dla uwierzytelniania z Clerk lub jwt testowego"""
    if not credentials:
        raise HTTPException(status_code=401, detail="Missing Authorization Header")

    token = credentials.credentials
    if token == "dev_test_token":
        return {"sub": "test_dev_user", "tenant": "dev_tenant"}

    try:
        # Dekodujemy token bez weryfikacji, aby pobrać adres wystawcy (issuer)
        unverified_claims = jwt.decode(token, options={"verify_signature": False})
        issuer = unverified_claims.get("iss")

        if not issuer:
            raise HTTPException(status_code=401, detail="Token missing 'iss' claim")

        jwks_url = f"{issuer.rstrip('/')}/.well-known/jwks.json"

        # PyJWKClient automatycznie cache'uje JWKS
        jwks_client = PyJWKClient(jwks_url)
        signing_key = jwks_client.get_signing_key_from_jwt(token)

        # Pełna weryfikacja kryptograficzna dla środowiska produkcyjnego
        decoded = jwt.decode(
            token,
            signing_key.key,
            algorithms=["RS256"],
            issuer=issuer,
            options={"verify_signature": True, "verify_aud": False},
        )
        return decoded
    except Exception as e:
        raise HTTPException(
            status_code=401, detail=f"Błąd dostępu lub nieprawidłowy token: {str(e)}"
        )


def check_api_quota(token_data: dict = Depends(verify_token)):
    """Middleware blokujący, jeśli wykorzystano limit tokenów"""
    user_id = token_data.get("sub", "anonymous")
    checker = SubscriptionChecker(user_id=user_id)

    if not checker.can_use_tokens():
        raise HTTPException(
            status_code=403,
            detail="Przekroczono limit tokenów dla Twojego planu. Zaktualizuj subskrypcję.",
        )
    return token_data