from google.oauth2 import id_token as google_id_token from google.auth.transport import requests as google_requests from jose import jwt as jose_jwt, JWTError, jwk from jose.utils import base64url_decode from typing import Dict, Optional import httpx import asyncio from functools import partial, lru_cache import logging from datetime import datetime, timedelta logger = logging.getLogger(__name__) class TokenVerificationError(Exception): """Custom exception for token verification errors""" pass class FacebookTokenVerifier: def __init__(self, app_id: str, app_secret: str): self.app_id = app_id self.app_secret = app_secret async def verify_token(self, token: str) -> Dict: """ Asynchronously verifies a Facebook access token and returns user data. """ try: # First, verify the token with Facebook's debug endpoint async with httpx.AsyncClient(timeout=10.0) as client: # Verify token validity debug_url = f"https://graph.facebook.com/debug_token" debug_params = { "input_token": token, "access_token": f"{self.app_id}|{self.app_secret}" } debug_response = await client.get(debug_url, params=debug_params) debug_response.raise_for_status() debug_data = debug_response.json() if not debug_data.get("data", {}).get("is_valid"): raise TokenVerificationError("Invalid Facebook token") # Check if token is for our app token_app_id = debug_data.get("data", {}).get("app_id") if token_app_id != self.app_id: raise TokenVerificationError("Token not for this app") # Get user data user_url = "https://graph.facebook.com/me" user_params = { "access_token": token, "fields": "id,name,email,picture.type(large)" } user_response = await client.get(user_url, params=user_params) user_response.raise_for_status() user_data = user_response.json() # Validate required fields if not user_data.get("id"): raise TokenVerificationError("Missing user ID in Facebook response") logger.info(f"Successfully verified Facebook token for user: {user_data.get('email', user_data.get('id'))}") return user_data except httpx.RequestError as e: logger.error(f"Facebook token verification request failed: {str(e)}") raise TokenVerificationError(f"Facebook API request failed: {str(e)}") except Exception as e: logger.error(f"Facebook token verification failed: {str(e)}") raise TokenVerificationError(f"Invalid Facebook token: {str(e)}") class GoogleTokenVerifier: def __init__(self, client_id: str): self.client_id = client_id async def verify_token(self, token: str) -> Dict: """ Asynchronously verifies a Google ID token and returns the payload if valid. """ try: loop = asyncio.get_event_loop() # Run the sync method in a thread to avoid blocking idinfo = await loop.run_in_executor( None, partial(google_id_token.verify_oauth2_token, token, google_requests.Request(), self.client_id) ) # Validate issuer if idinfo.get('iss') not in ['accounts.google.com', 'https://accounts.google.com']: raise TokenVerificationError('Invalid issuer') # Additional validation if idinfo.get('aud') != self.client_id: raise TokenVerificationError('Invalid audience') # Check token expiration (extra safety) exp = idinfo.get('exp') if exp and datetime.fromtimestamp(exp) < datetime.utcnow(): raise TokenVerificationError('Token has expired') logger.info(f"Successfully verified Google token for user: {idinfo.get('email')}") return idinfo except Exception as e: logger.error(f"Google token verification failed: {str(e)}") raise TokenVerificationError(f"Invalid Google token: {str(e)}") class GoogleAccessTokenVerifier: def __init__(self): pass async def verify_access_token(self, access_token: str) -> Dict: """ Verify Google OAuth access token by calling the OIDC UserInfo endpoint. Returns normalized user info containing at least 'sub' and 'email' if available. """ try: headers = {"Authorization": f"Bearer {access_token}"} async with httpx.AsyncClient(timeout=10.0) as client: # Google OIDC UserInfo endpoint resp = await client.get("https://openidconnect.googleapis.com/v1/userinfo", headers=headers) resp.raise_for_status() data = resp.json() # Basic sanity checks if not data.get("sub") and not data.get("id"): raise TokenVerificationError("Missing subject/id in Google userinfo response") # Normalize shape similar to ID token claims user_info = { "sub": data.get("sub") or data.get("id"), "email": data.get("email"), "email_verified": data.get("email_verified"), "name": data.get("name"), "picture": data.get("picture"), } logger.info(f"Successfully verified Google access token for user: {user_info.get('email')}") return user_info except httpx.HTTPStatusError as e: logger.error(f"Google userinfo verification failed: {e.response.text}") raise TokenVerificationError("Invalid Google access token") except httpx.RequestError as e: logger.error(f"Google userinfo request error: {str(e)}") raise TokenVerificationError(f"Google userinfo request failed: {str(e)}") except Exception as e: logger.error(f"Google access token verification failed: {str(e)}") raise TokenVerificationError(f"Invalid Google access token: {str(e)}") class AppleTokenVerifier: def __init__(self, audience: str, cache_duration: int = 3600): self.audience = audience self.cache_duration = cache_duration self._keys_cache = None self._cache_timestamp = None async def _get_apple_keys(self) -> list: """ Fetch Apple's public keys with caching to reduce API calls. """ now = datetime.utcnow() # Check if cache is still valid if (self._keys_cache and self._cache_timestamp and now - self._cache_timestamp < timedelta(seconds=self.cache_duration)): return self._keys_cache try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get('https://appleid.apple.com/auth/keys') response.raise_for_status() keys = response.json().get('keys', []) # Update cache self._keys_cache = keys self._cache_timestamp = now logger.info("Successfully fetched Apple public keys") return keys except httpx.RequestError as e: logger.error(f"Failed to fetch Apple keys: {str(e)}") raise TokenVerificationError(f"Could not fetch Apple public keys: {str(e)}") async def verify_token(self, token: str) -> Dict: """ Asynchronously verifies an Apple identity token and returns the decoded payload. """ try: # Fetch Apple's public keys keys = await self._get_apple_keys() # Decode header to get kid and alg header = jose_jwt.get_unverified_header(token) kid = header.get('kid') alg = header.get('alg') if not kid or not alg: raise TokenVerificationError("Token header missing required fields") # Find matching key key = next((k for k in keys if k['kid'] == kid and k['alg'] == alg), None) if not key: raise TokenVerificationError("Public key not found for Apple token") # Verify signature manually (additional safety) public_key = jwk.construct(key) message, encoded_sig = token.rsplit('.', 1) decoded_sig = base64url_decode(encoded_sig.encode()) if not public_key.verify(message.encode(), decoded_sig): raise TokenVerificationError("Invalid Apple token signature") # Decode and validate claims claims = jose_jwt.decode( token, key, algorithms=['RS256'], audience=self.audience, issuer='https://appleid.apple.com' ) # Additional validation if claims.get('aud') != self.audience: raise TokenVerificationError('Invalid audience') logger.info(f"Successfully verified Apple token for user: {claims.get('sub')}") return claims except JWTError as e: logger.error(f"JWT error during Apple token verification: {str(e)}") raise TokenVerificationError(f"Invalid Apple token: {str(e)}") except Exception as e: logger.error(f"Apple token verification failed: {str(e)}") raise TokenVerificationError(f"Invalid Apple token: {str(e)}") # Factory class for easier usage class OAuthVerifier: def __init__(self, google_client_id: Optional[str] = None, apple_audience: Optional[str] = None, facebook_app_id: Optional[str] = None, facebook_app_secret: Optional[str] = None): self.google_verifier = GoogleTokenVerifier(google_client_id) if google_client_id else None self.google_access_verifier = GoogleAccessTokenVerifier() self.apple_verifier = AppleTokenVerifier(apple_audience) if apple_audience else None self.facebook_verifier = FacebookTokenVerifier(facebook_app_id, facebook_app_secret) if facebook_app_id and facebook_app_secret else None async def verify_google_token(self, token: str) -> Dict: if not self.google_verifier: raise TokenVerificationError("Google verifier not configured") return await self.google_verifier.verify_token(token) async def verify_google_access_token(self, token: str) -> Dict: return await self.google_access_verifier.verify_access_token(token) async def verify_apple_token(self, token: str) -> Dict: if not self.apple_verifier: raise TokenVerificationError("Apple verifier not configured") return await self.apple_verifier.verify_token(token) async def verify_facebook_token(self, token: str) -> Dict: if not self.facebook_verifier: raise TokenVerificationError("Facebook verifier not configured") return await self.facebook_verifier.verify_token(token) # Convenience functions (backward compatibility) async def verify_google_token(token: str, client_id: str) -> Dict: """ Asynchronously verifies a Google ID token and returns the payload if valid. In local test mode, bypass external verification and synthesize minimal claims. """ try: from app.core.config import settings if getattr(settings, "OAUTH_TEST_MODE", False): # Accept anything shaped like a JWT, otherwise synthesize from raw token sub = "test-google-sub" email = "test.user@example.com" if token.count(".") == 2: # Try to decode header/payload without verification to extract email/sub if present try: unverified = jose_jwt.get_unverified_claims(token) sub = unverified.get("sub", sub) email = unverified.get("email", email) except Exception: pass return {"sub": sub, "email": email, "aud": client_id} except Exception: # Fall through to real verification if settings import fails pass verifier = GoogleTokenVerifier(client_id) return await verifier.verify_token(token) async def verify_google_access_token(token: str) -> Dict: """ Asynchronously verifies a Google OAuth access token via the UserInfo endpoint. In local test mode, bypass network call and return synthetic user info. """ try: from app.core.config import settings if getattr(settings, "OAUTH_TEST_MODE", False): # Strip optional Bearer prefix t = token if t.lower().startswith("bearer "): t = t[7:] return { "sub": "test-google-access", "email": "test.access@example.com", "email_verified": True, "name": "Test Access", "picture": None, } except Exception: pass verifier = GoogleAccessTokenVerifier() return await verifier.verify_access_token(token) async def verify_apple_token(token: str, audience: str) -> Dict: """ Asynchronously verifies an Apple identity token and returns the decoded payload. In local test mode, bypass external verification and synthesize minimal claims. """ try: from app.core.config import settings if getattr(settings, "OAUTH_TEST_MODE", False): sub = "test-apple-sub" if token.count(".") == 2: try: unverified = jose_jwt.get_unverified_claims(token) sub = unverified.get("sub", sub) except Exception: pass return {"sub": sub, "aud": audience} except Exception: pass verifier = AppleTokenVerifier(audience) return await verifier.verify_token(token) async def verify_facebook_token(token: str, app_id: str, app_secret: str) -> Dict: """ Asynchronously verifies a Facebook access token and returns user data. In local test mode, bypass network call and return synthetic user data. """ try: from app.core.config import settings if getattr(settings, "OAUTH_TEST_MODE", False): return {"id": "test-facebook-id", "name": "Test FB", "email": "fb.test@example.com"} except Exception: pass verifier = FacebookTokenVerifier(app_id, app_secret) return await verifier.verify_token(token) # Example usage async def example_usage(): # Initialize verifier oauth_verifier = OAuthVerifier( google_client_id="your-google-client-id.googleusercontent.com", apple_audience="your.app.bundle.id" ) try: # Verify Google token google_claims = await oauth_verifier.verify_google_token("google_id_token_here") print(f"Google user: {google_claims.get('email')}") # Verify Apple token apple_claims = await oauth_verifier.verify_apple_token("apple_id_token_here") print(f"Apple user: {apple_claims.get('sub')}") except TokenVerificationError as e: print(f"Verification failed: {e}") if __name__ == "__main__": asyncio.run(example_usage())