GrantForge Bot
Deploy to Hugging Face
afd56bc
import jwt
from jwt import PyJWKClient
from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from core.subscription.checker import SubscriptionChecker
security = HTTPBearer(auto_error=False)
def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
"""Middleware dla uwierzytelniania z Clerk lub jwt testowego"""
if not credentials:
raise HTTPException(status_code=401, detail="Missing Authorization Header")
token = credentials.credentials
if token == "dev_test_token":
return {"sub": "test_dev_user", "tenant": "dev_tenant"}
try:
# Dekodujemy token bez weryfikacji, aby pobra膰 adres wystawcy (issuer)
unverified_claims = jwt.decode(token, options={"verify_signature": False})
issuer = unverified_claims.get("iss")
if not issuer:
raise HTTPException(status_code=401, detail="Token missing 'iss' claim")
jwks_url = f"{issuer.rstrip('/')}/.well-known/jwks.json"
# PyJWKClient automatycznie cache'uje JWKS
jwks_client = PyJWKClient(jwks_url)
signing_key = jwks_client.get_signing_key_from_jwt(token)
# Pe艂na weryfikacja kryptograficzna dla 艣rodowiska produkcyjnego
decoded = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
issuer=issuer,
options={"verify_signature": True, "verify_aud": False},
)
return decoded
except Exception as e:
raise HTTPException(
status_code=401, detail=f"B艂膮d dost臋pu lub nieprawid艂owy token: {str(e)}"
)
def check_api_quota(token_data: dict = Depends(verify_token)):
"""Middleware blokuj膮cy, je艣li wykorzystano limit token贸w"""
user_id = token_data.get("sub", "anonymous")
checker = SubscriptionChecker(user_id=user_id)
if not checker.can_use_tokens():
raise HTTPException(
status_code=403,
detail="Przekroczono limit token贸w dla Twojego planu. Zaktualizuj subskrypcj臋.",
)
return token_data