Spaces:
Sleeping
Sleeping
| from google.oauth2 import id_token as google_id_token | |
| from google.auth.transport import requests as google_requests | |
| from jose import jwt as jose_jwt, JWTError, jwk | |
| from jose.utils import base64url_decode | |
| from typing import Dict, Optional | |
| import httpx | |
| import asyncio | |
| from functools import partial, lru_cache | |
| import logging | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| class TokenVerificationError(Exception): | |
| """Custom exception for token verification errors""" | |
| pass | |
| class FacebookTokenVerifier: | |
| def __init__(self, app_id: str, app_secret: str): | |
| self.app_id = app_id | |
| self.app_secret = app_secret | |
| async def verify_token(self, token: str) -> Dict: | |
| """ | |
| Asynchronously verifies a Facebook access token and returns user data. | |
| """ | |
| try: | |
| # First, verify the token with Facebook's debug endpoint | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| # Verify token validity | |
| debug_url = f"https://graph.facebook.com/debug_token" | |
| debug_params = { | |
| "input_token": token, | |
| "access_token": f"{self.app_id}|{self.app_secret}" | |
| } | |
| debug_response = await client.get(debug_url, params=debug_params) | |
| debug_response.raise_for_status() | |
| debug_data = debug_response.json() | |
| if not debug_data.get("data", {}).get("is_valid"): | |
| raise TokenVerificationError("Invalid Facebook token") | |
| # Check if token is for our app | |
| token_app_id = debug_data.get("data", {}).get("app_id") | |
| if token_app_id != self.app_id: | |
| raise TokenVerificationError("Token not for this app") | |
| # Get user data | |
| user_url = "https://graph.facebook.com/me" | |
| user_params = { | |
| "access_token": token, | |
| "fields": "id,name,email,picture.type(large)" | |
| } | |
| user_response = await client.get(user_url, params=user_params) | |
| user_response.raise_for_status() | |
| user_data = user_response.json() | |
| # Validate required fields | |
| if not user_data.get("id"): | |
| raise TokenVerificationError("Missing user ID in Facebook response") | |
| logger.info(f"Successfully verified Facebook token for user: {user_data.get('email', user_data.get('id'))}") | |
| return user_data | |
| except httpx.RequestError as e: | |
| logger.error(f"Facebook token verification request failed: {str(e)}") | |
| raise TokenVerificationError(f"Facebook API request failed: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Facebook token verification failed: {str(e)}") | |
| raise TokenVerificationError(f"Invalid Facebook token: {str(e)}") | |
| class GoogleTokenVerifier: | |
| def __init__(self, client_id: str): | |
| self.client_id = client_id | |
| async def verify_token(self, token: str) -> Dict: | |
| """ | |
| Asynchronously verifies a Google ID token and returns the payload if valid. | |
| """ | |
| try: | |
| loop = asyncio.get_event_loop() | |
| # Run the sync method in a thread to avoid blocking | |
| idinfo = await loop.run_in_executor( | |
| None, | |
| partial(google_id_token.verify_oauth2_token, token, google_requests.Request(), self.client_id) | |
| ) | |
| # Validate issuer | |
| if idinfo.get('iss') not in ['accounts.google.com', 'https://accounts.google.com']: | |
| raise TokenVerificationError('Invalid issuer') | |
| # Additional validation | |
| if idinfo.get('aud') != self.client_id: | |
| raise TokenVerificationError('Invalid audience') | |
| # Check token expiration (extra safety) | |
| exp = idinfo.get('exp') | |
| if exp and datetime.fromtimestamp(exp) < datetime.utcnow(): | |
| raise TokenVerificationError('Token has expired') | |
| logger.info(f"Successfully verified Google token for user: {idinfo.get('email')}") | |
| return idinfo | |
| except Exception as e: | |
| logger.error(f"Google token verification failed: {str(e)}") | |
| raise TokenVerificationError(f"Invalid Google token: {str(e)}") | |
| class GoogleAccessTokenVerifier: | |
| def __init__(self): | |
| pass | |
| async def verify_access_token(self, access_token: str) -> Dict: | |
| """ | |
| Verify Google OAuth access token by calling the OIDC UserInfo endpoint. | |
| Returns normalized user info containing at least 'sub' and 'email' if available. | |
| """ | |
| try: | |
| headers = {"Authorization": f"Bearer {access_token}"} | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| # Google OIDC UserInfo endpoint | |
| resp = await client.get("https://openidconnect.googleapis.com/v1/userinfo", headers=headers) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # Basic sanity checks | |
| if not data.get("sub") and not data.get("id"): | |
| raise TokenVerificationError("Missing subject/id in Google userinfo response") | |
| # Normalize shape similar to ID token claims | |
| user_info = { | |
| "sub": data.get("sub") or data.get("id"), | |
| "email": data.get("email"), | |
| "email_verified": data.get("email_verified"), | |
| "name": data.get("name"), | |
| "picture": data.get("picture"), | |
| } | |
| logger.info(f"Successfully verified Google access token for user: {user_info.get('email')}") | |
| return user_info | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"Google userinfo verification failed: {e.response.text}") | |
| raise TokenVerificationError("Invalid Google access token") | |
| except httpx.RequestError as e: | |
| logger.error(f"Google userinfo request error: {str(e)}") | |
| raise TokenVerificationError(f"Google userinfo request failed: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Google access token verification failed: {str(e)}") | |
| raise TokenVerificationError(f"Invalid Google access token: {str(e)}") | |
| class AppleTokenVerifier: | |
| def __init__(self, audience: str, cache_duration: int = 3600): | |
| self.audience = audience | |
| self.cache_duration = cache_duration | |
| self._keys_cache = None | |
| self._cache_timestamp = None | |
| async def _get_apple_keys(self) -> list: | |
| """ | |
| Fetch Apple's public keys with caching to reduce API calls. | |
| """ | |
| now = datetime.utcnow() | |
| # Check if cache is still valid | |
| if (self._keys_cache and self._cache_timestamp and | |
| now - self._cache_timestamp < timedelta(seconds=self.cache_duration)): | |
| return self._keys_cache | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get('https://appleid.apple.com/auth/keys') | |
| response.raise_for_status() | |
| keys = response.json().get('keys', []) | |
| # Update cache | |
| self._keys_cache = keys | |
| self._cache_timestamp = now | |
| logger.info("Successfully fetched Apple public keys") | |
| return keys | |
| except httpx.RequestError as e: | |
| logger.error(f"Failed to fetch Apple keys: {str(e)}") | |
| raise TokenVerificationError(f"Could not fetch Apple public keys: {str(e)}") | |
| async def verify_token(self, token: str) -> Dict: | |
| """ | |
| Asynchronously verifies an Apple identity token and returns the decoded payload. | |
| """ | |
| try: | |
| # Fetch Apple's public keys | |
| keys = await self._get_apple_keys() | |
| # Decode header to get kid and alg | |
| header = jose_jwt.get_unverified_header(token) | |
| kid = header.get('kid') | |
| alg = header.get('alg') | |
| if not kid or not alg: | |
| raise TokenVerificationError("Token header missing required fields") | |
| # Find matching key | |
| key = next((k for k in keys if k['kid'] == kid and k['alg'] == alg), None) | |
| if not key: | |
| raise TokenVerificationError("Public key not found for Apple token") | |
| # Verify signature manually (additional safety) | |
| public_key = jwk.construct(key) | |
| message, encoded_sig = token.rsplit('.', 1) | |
| decoded_sig = base64url_decode(encoded_sig.encode()) | |
| if not public_key.verify(message.encode(), decoded_sig): | |
| raise TokenVerificationError("Invalid Apple token signature") | |
| # Decode and validate claims | |
| claims = jose_jwt.decode( | |
| token, | |
| key, | |
| algorithms=['RS256'], | |
| audience=self.audience, | |
| issuer='https://appleid.apple.com' | |
| ) | |
| # Additional validation | |
| if claims.get('aud') != self.audience: | |
| raise TokenVerificationError('Invalid audience') | |
| logger.info(f"Successfully verified Apple token for user: {claims.get('sub')}") | |
| return claims | |
| except JWTError as e: | |
| logger.error(f"JWT error during Apple token verification: {str(e)}") | |
| raise TokenVerificationError(f"Invalid Apple token: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Apple token verification failed: {str(e)}") | |
| raise TokenVerificationError(f"Invalid Apple token: {str(e)}") | |
| # Factory class for easier usage | |
| class OAuthVerifier: | |
| def __init__(self, google_client_id: Optional[str] = None, apple_audience: Optional[str] = None, | |
| facebook_app_id: Optional[str] = None, facebook_app_secret: Optional[str] = None): | |
| self.google_verifier = GoogleTokenVerifier(google_client_id) if google_client_id else None | |
| self.google_access_verifier = GoogleAccessTokenVerifier() | |
| self.apple_verifier = AppleTokenVerifier(apple_audience) if apple_audience else None | |
| self.facebook_verifier = FacebookTokenVerifier(facebook_app_id, facebook_app_secret) if facebook_app_id and facebook_app_secret else None | |
| async def verify_google_token(self, token: str) -> Dict: | |
| if not self.google_verifier: | |
| raise TokenVerificationError("Google verifier not configured") | |
| return await self.google_verifier.verify_token(token) | |
| async def verify_google_access_token(self, token: str) -> Dict: | |
| return await self.google_access_verifier.verify_access_token(token) | |
| async def verify_apple_token(self, token: str) -> Dict: | |
| if not self.apple_verifier: | |
| raise TokenVerificationError("Apple verifier not configured") | |
| return await self.apple_verifier.verify_token(token) | |
| async def verify_facebook_token(self, token: str) -> Dict: | |
| if not self.facebook_verifier: | |
| raise TokenVerificationError("Facebook verifier not configured") | |
| return await self.facebook_verifier.verify_token(token) | |
| # Convenience functions (backward compatibility) | |
| async def verify_google_token(token: str, client_id: str) -> Dict: | |
| """ | |
| Asynchronously verifies a Google ID token and returns the payload if valid. | |
| In local test mode, bypass external verification and synthesize minimal claims. | |
| """ | |
| try: | |
| from app.core.config import settings | |
| if getattr(settings, "OAUTH_TEST_MODE", False): | |
| # Accept anything shaped like a JWT, otherwise synthesize from raw token | |
| sub = "test-google-sub" | |
| email = "test.user@example.com" | |
| if token.count(".") == 2: | |
| # Try to decode header/payload without verification to extract email/sub if present | |
| try: | |
| unverified = jose_jwt.get_unverified_claims(token) | |
| sub = unverified.get("sub", sub) | |
| email = unverified.get("email", email) | |
| except Exception: | |
| pass | |
| return {"sub": sub, "email": email, "aud": client_id} | |
| except Exception: | |
| # Fall through to real verification if settings import fails | |
| pass | |
| verifier = GoogleTokenVerifier(client_id) | |
| return await verifier.verify_token(token) | |
| async def verify_google_access_token(token: str) -> Dict: | |
| """ | |
| Asynchronously verifies a Google OAuth access token via the UserInfo endpoint. | |
| In local test mode, bypass network call and return synthetic user info. | |
| """ | |
| try: | |
| from app.core.config import settings | |
| if getattr(settings, "OAUTH_TEST_MODE", False): | |
| # Strip optional Bearer prefix | |
| t = token | |
| if t.lower().startswith("bearer "): | |
| t = t[7:] | |
| return { | |
| "sub": "test-google-access", | |
| "email": "test.access@example.com", | |
| "email_verified": True, | |
| "name": "Test Access", | |
| "picture": None, | |
| } | |
| except Exception: | |
| pass | |
| verifier = GoogleAccessTokenVerifier() | |
| return await verifier.verify_access_token(token) | |
| async def verify_apple_token(token: str, audience: str) -> Dict: | |
| """ | |
| Asynchronously verifies an Apple identity token and returns the decoded payload. | |
| In local test mode, bypass external verification and synthesize minimal claims. | |
| """ | |
| try: | |
| from app.core.config import settings | |
| if getattr(settings, "OAUTH_TEST_MODE", False): | |
| sub = "test-apple-sub" | |
| if token.count(".") == 2: | |
| try: | |
| unverified = jose_jwt.get_unverified_claims(token) | |
| sub = unverified.get("sub", sub) | |
| except Exception: | |
| pass | |
| return {"sub": sub, "aud": audience} | |
| except Exception: | |
| pass | |
| verifier = AppleTokenVerifier(audience) | |
| return await verifier.verify_token(token) | |
| async def verify_facebook_token(token: str, app_id: str, app_secret: str) -> Dict: | |
| """ | |
| Asynchronously verifies a Facebook access token and returns user data. | |
| In local test mode, bypass network call and return synthetic user data. | |
| """ | |
| try: | |
| from app.core.config import settings | |
| if getattr(settings, "OAUTH_TEST_MODE", False): | |
| return {"id": "test-facebook-id", "name": "Test FB", "email": "fb.test@example.com"} | |
| except Exception: | |
| pass | |
| verifier = FacebookTokenVerifier(app_id, app_secret) | |
| return await verifier.verify_token(token) | |
| # Example usage | |
| async def example_usage(): | |
| # Initialize verifier | |
| oauth_verifier = OAuthVerifier( | |
| google_client_id="your-google-client-id.googleusercontent.com", | |
| apple_audience="your.app.bundle.id" | |
| ) | |
| try: | |
| # Verify Google token | |
| google_claims = await oauth_verifier.verify_google_token("google_id_token_here") | |
| print(f"Google user: {google_claims.get('email')}") | |
| # Verify Apple token | |
| apple_claims = await oauth_verifier.verify_apple_token("apple_id_token_here") | |
| print(f"Apple user: {apple_claims.get('sub')}") | |
| except TokenVerificationError as e: | |
| print(f"Verification failed: {e}") | |
| if __name__ == "__main__": | |
| asyncio.run(example_usage()) |