import jwt from jwt import PyJWKClient from fastapi import Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from core.subscription.checker import SubscriptionChecker security = HTTPBearer(auto_error=False) def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): """Middleware dla uwierzytelniania z Clerk lub jwt testowego""" if not credentials: raise HTTPException(status_code=401, detail="Missing Authorization Header") token = credentials.credentials if token == "dev_test_token": return {"sub": "test_dev_user", "tenant": "dev_tenant"} try: # Dekodujemy token bez weryfikacji, aby pobrać adres wystawcy (issuer) unverified_claims = jwt.decode(token, options={"verify_signature": False}) issuer = unverified_claims.get("iss") if not issuer: raise HTTPException(status_code=401, detail="Token missing 'iss' claim") jwks_url = f"{issuer.rstrip('/')}/.well-known/jwks.json" # PyJWKClient automatycznie cache'uje JWKS jwks_client = PyJWKClient(jwks_url) signing_key = jwks_client.get_signing_key_from_jwt(token) # Pełna weryfikacja kryptograficzna dla środowiska produkcyjnego decoded = jwt.decode( token, signing_key.key, algorithms=["RS256"], issuer=issuer, options={"verify_signature": True, "verify_aud": False}, ) return decoded except Exception as e: raise HTTPException( status_code=401, detail=f"Błąd dostępu lub nieprawidłowy token: {str(e)}" ) def check_api_quota(token_data: dict = Depends(verify_token)): """Middleware blokujący, jeśli wykorzystano limit tokenów""" user_id = token_data.get("sub", "anonymous") checker = SubscriptionChecker(user_id=user_id) if not checker.can_use_tokens(): raise HTTPException( status_code=403, detail="Przekroczono limit tokenów dla Twojego planu. Zaktualizuj subskrypcję.", ) return token_data