bookmyservice-ums / app /utils /social_utils.py
MukeshKapoor25's picture
feat(oauth): add local testing mode to bypass external verification
2fc2b48
from google.oauth2 import id_token as google_id_token
from google.auth.transport import requests as google_requests
from jose import jwt as jose_jwt, JWTError, jwk
from jose.utils import base64url_decode
from typing import Dict, Optional
import httpx
import asyncio
from functools import partial, lru_cache
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class TokenVerificationError(Exception):
"""Custom exception for token verification errors"""
pass
class FacebookTokenVerifier:
def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
async def verify_token(self, token: str) -> Dict:
"""
Asynchronously verifies a Facebook access token and returns user data.
"""
try:
# First, verify the token with Facebook's debug endpoint
async with httpx.AsyncClient(timeout=10.0) as client:
# Verify token validity
debug_url = f"https://graph.facebook.com/debug_token"
debug_params = {
"input_token": token,
"access_token": f"{self.app_id}|{self.app_secret}"
}
debug_response = await client.get(debug_url, params=debug_params)
debug_response.raise_for_status()
debug_data = debug_response.json()
if not debug_data.get("data", {}).get("is_valid"):
raise TokenVerificationError("Invalid Facebook token")
# Check if token is for our app
token_app_id = debug_data.get("data", {}).get("app_id")
if token_app_id != self.app_id:
raise TokenVerificationError("Token not for this app")
# Get user data
user_url = "https://graph.facebook.com/me"
user_params = {
"access_token": token,
"fields": "id,name,email,picture.type(large)"
}
user_response = await client.get(user_url, params=user_params)
user_response.raise_for_status()
user_data = user_response.json()
# Validate required fields
if not user_data.get("id"):
raise TokenVerificationError("Missing user ID in Facebook response")
logger.info(f"Successfully verified Facebook token for user: {user_data.get('email', user_data.get('id'))}")
return user_data
except httpx.RequestError as e:
logger.error(f"Facebook token verification request failed: {str(e)}")
raise TokenVerificationError(f"Facebook API request failed: {str(e)}")
except Exception as e:
logger.error(f"Facebook token verification failed: {str(e)}")
raise TokenVerificationError(f"Invalid Facebook token: {str(e)}")
class GoogleTokenVerifier:
def __init__(self, client_id: str):
self.client_id = client_id
async def verify_token(self, token: str) -> Dict:
"""
Asynchronously verifies a Google ID token and returns the payload if valid.
"""
try:
loop = asyncio.get_event_loop()
# Run the sync method in a thread to avoid blocking
idinfo = await loop.run_in_executor(
None,
partial(google_id_token.verify_oauth2_token, token, google_requests.Request(), self.client_id)
)
# Validate issuer
if idinfo.get('iss') not in ['accounts.google.com', 'https://accounts.google.com']:
raise TokenVerificationError('Invalid issuer')
# Additional validation
if idinfo.get('aud') != self.client_id:
raise TokenVerificationError('Invalid audience')
# Check token expiration (extra safety)
exp = idinfo.get('exp')
if exp and datetime.fromtimestamp(exp) < datetime.utcnow():
raise TokenVerificationError('Token has expired')
logger.info(f"Successfully verified Google token for user: {idinfo.get('email')}")
return idinfo
except Exception as e:
logger.error(f"Google token verification failed: {str(e)}")
raise TokenVerificationError(f"Invalid Google token: {str(e)}")
class GoogleAccessTokenVerifier:
def __init__(self):
pass
async def verify_access_token(self, access_token: str) -> Dict:
"""
Verify Google OAuth access token by calling the OIDC UserInfo endpoint.
Returns normalized user info containing at least 'sub' and 'email' if available.
"""
try:
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient(timeout=10.0) as client:
# Google OIDC UserInfo endpoint
resp = await client.get("https://openidconnect.googleapis.com/v1/userinfo", headers=headers)
resp.raise_for_status()
data = resp.json()
# Basic sanity checks
if not data.get("sub") and not data.get("id"):
raise TokenVerificationError("Missing subject/id in Google userinfo response")
# Normalize shape similar to ID token claims
user_info = {
"sub": data.get("sub") or data.get("id"),
"email": data.get("email"),
"email_verified": data.get("email_verified"),
"name": data.get("name"),
"picture": data.get("picture"),
}
logger.info(f"Successfully verified Google access token for user: {user_info.get('email')}")
return user_info
except httpx.HTTPStatusError as e:
logger.error(f"Google userinfo verification failed: {e.response.text}")
raise TokenVerificationError("Invalid Google access token")
except httpx.RequestError as e:
logger.error(f"Google userinfo request error: {str(e)}")
raise TokenVerificationError(f"Google userinfo request failed: {str(e)}")
except Exception as e:
logger.error(f"Google access token verification failed: {str(e)}")
raise TokenVerificationError(f"Invalid Google access token: {str(e)}")
class AppleTokenVerifier:
def __init__(self, audience: str, cache_duration: int = 3600):
self.audience = audience
self.cache_duration = cache_duration
self._keys_cache = None
self._cache_timestamp = None
async def _get_apple_keys(self) -> list:
"""
Fetch Apple's public keys with caching to reduce API calls.
"""
now = datetime.utcnow()
# Check if cache is still valid
if (self._keys_cache and self._cache_timestamp and
now - self._cache_timestamp < timedelta(seconds=self.cache_duration)):
return self._keys_cache
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get('https://appleid.apple.com/auth/keys')
response.raise_for_status()
keys = response.json().get('keys', [])
# Update cache
self._keys_cache = keys
self._cache_timestamp = now
logger.info("Successfully fetched Apple public keys")
return keys
except httpx.RequestError as e:
logger.error(f"Failed to fetch Apple keys: {str(e)}")
raise TokenVerificationError(f"Could not fetch Apple public keys: {str(e)}")
async def verify_token(self, token: str) -> Dict:
"""
Asynchronously verifies an Apple identity token and returns the decoded payload.
"""
try:
# Fetch Apple's public keys
keys = await self._get_apple_keys()
# Decode header to get kid and alg
header = jose_jwt.get_unverified_header(token)
kid = header.get('kid')
alg = header.get('alg')
if not kid or not alg:
raise TokenVerificationError("Token header missing required fields")
# Find matching key
key = next((k for k in keys if k['kid'] == kid and k['alg'] == alg), None)
if not key:
raise TokenVerificationError("Public key not found for Apple token")
# Verify signature manually (additional safety)
public_key = jwk.construct(key)
message, encoded_sig = token.rsplit('.', 1)
decoded_sig = base64url_decode(encoded_sig.encode())
if not public_key.verify(message.encode(), decoded_sig):
raise TokenVerificationError("Invalid Apple token signature")
# Decode and validate claims
claims = jose_jwt.decode(
token,
key,
algorithms=['RS256'],
audience=self.audience,
issuer='https://appleid.apple.com'
)
# Additional validation
if claims.get('aud') != self.audience:
raise TokenVerificationError('Invalid audience')
logger.info(f"Successfully verified Apple token for user: {claims.get('sub')}")
return claims
except JWTError as e:
logger.error(f"JWT error during Apple token verification: {str(e)}")
raise TokenVerificationError(f"Invalid Apple token: {str(e)}")
except Exception as e:
logger.error(f"Apple token verification failed: {str(e)}")
raise TokenVerificationError(f"Invalid Apple token: {str(e)}")
# Factory class for easier usage
class OAuthVerifier:
def __init__(self, google_client_id: Optional[str] = None, apple_audience: Optional[str] = None,
facebook_app_id: Optional[str] = None, facebook_app_secret: Optional[str] = None):
self.google_verifier = GoogleTokenVerifier(google_client_id) if google_client_id else None
self.google_access_verifier = GoogleAccessTokenVerifier()
self.apple_verifier = AppleTokenVerifier(apple_audience) if apple_audience else None
self.facebook_verifier = FacebookTokenVerifier(facebook_app_id, facebook_app_secret) if facebook_app_id and facebook_app_secret else None
async def verify_google_token(self, token: str) -> Dict:
if not self.google_verifier:
raise TokenVerificationError("Google verifier not configured")
return await self.google_verifier.verify_token(token)
async def verify_google_access_token(self, token: str) -> Dict:
return await self.google_access_verifier.verify_access_token(token)
async def verify_apple_token(self, token: str) -> Dict:
if not self.apple_verifier:
raise TokenVerificationError("Apple verifier not configured")
return await self.apple_verifier.verify_token(token)
async def verify_facebook_token(self, token: str) -> Dict:
if not self.facebook_verifier:
raise TokenVerificationError("Facebook verifier not configured")
return await self.facebook_verifier.verify_token(token)
# Convenience functions (backward compatibility)
async def verify_google_token(token: str, client_id: str) -> Dict:
"""
Asynchronously verifies a Google ID token and returns the payload if valid.
In local test mode, bypass external verification and synthesize minimal claims.
"""
try:
from app.core.config import settings
if getattr(settings, "OAUTH_TEST_MODE", False):
# Accept anything shaped like a JWT, otherwise synthesize from raw token
sub = "test-google-sub"
email = "test.user@example.com"
if token.count(".") == 2:
# Try to decode header/payload without verification to extract email/sub if present
try:
unverified = jose_jwt.get_unverified_claims(token)
sub = unverified.get("sub", sub)
email = unverified.get("email", email)
except Exception:
pass
return {"sub": sub, "email": email, "aud": client_id}
except Exception:
# Fall through to real verification if settings import fails
pass
verifier = GoogleTokenVerifier(client_id)
return await verifier.verify_token(token)
async def verify_google_access_token(token: str) -> Dict:
"""
Asynchronously verifies a Google OAuth access token via the UserInfo endpoint.
In local test mode, bypass network call and return synthetic user info.
"""
try:
from app.core.config import settings
if getattr(settings, "OAUTH_TEST_MODE", False):
# Strip optional Bearer prefix
t = token
if t.lower().startswith("bearer "):
t = t[7:]
return {
"sub": "test-google-access",
"email": "test.access@example.com",
"email_verified": True,
"name": "Test Access",
"picture": None,
}
except Exception:
pass
verifier = GoogleAccessTokenVerifier()
return await verifier.verify_access_token(token)
async def verify_apple_token(token: str, audience: str) -> Dict:
"""
Asynchronously verifies an Apple identity token and returns the decoded payload.
In local test mode, bypass external verification and synthesize minimal claims.
"""
try:
from app.core.config import settings
if getattr(settings, "OAUTH_TEST_MODE", False):
sub = "test-apple-sub"
if token.count(".") == 2:
try:
unverified = jose_jwt.get_unverified_claims(token)
sub = unverified.get("sub", sub)
except Exception:
pass
return {"sub": sub, "aud": audience}
except Exception:
pass
verifier = AppleTokenVerifier(audience)
return await verifier.verify_token(token)
async def verify_facebook_token(token: str, app_id: str, app_secret: str) -> Dict:
"""
Asynchronously verifies a Facebook access token and returns user data.
In local test mode, bypass network call and return synthetic user data.
"""
try:
from app.core.config import settings
if getattr(settings, "OAUTH_TEST_MODE", False):
return {"id": "test-facebook-id", "name": "Test FB", "email": "fb.test@example.com"}
except Exception:
pass
verifier = FacebookTokenVerifier(app_id, app_secret)
return await verifier.verify_token(token)
# Example usage
async def example_usage():
# Initialize verifier
oauth_verifier = OAuthVerifier(
google_client_id="your-google-client-id.googleusercontent.com",
apple_audience="your.app.bundle.id"
)
try:
# Verify Google token
google_claims = await oauth_verifier.verify_google_token("google_id_token_here")
print(f"Google user: {google_claims.get('email')}")
# Verify Apple token
apple_claims = await oauth_verifier.verify_apple_token("apple_id_token_here")
print(f"Apple user: {apple_claims.get('sub')}")
except TokenVerificationError as e:
print(f"Verification failed: {e}")
if __name__ == "__main__":
asyncio.run(example_usage())