MukeshKapoor25 commited on
Commit
981259a
·
1 Parent(s): 5f27f7b
Files changed (1) hide show
  1. app/utils/social_utils.py +175 -52
app/utils/social_utils.py CHANGED
@@ -2,69 +2,192 @@ from google.oauth2 import id_token as google_id_token
2
  from google.auth.transport import requests as google_requests
3
  from jose import jwt as jose_jwt, JWTError, jwk
4
  from jose.utils import base64url_decode
5
- from typing import Dict
6
  import httpx
7
- import json
8
-
9
- # Async wrapper to run Google’s sync function in a thread pool
10
  import asyncio
11
- from functools import partial
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
- # ✅ Async Google token verification
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  async def verify_google_token(token: str, client_id: str) -> Dict:
15
  """
16
  Asynchronously verifies a Google ID token and returns the payload if valid.
17
  """
18
- try:
19
- loop = asyncio.get_event_loop()
20
- # Run the sync method in a thread to avoid blocking
21
- idinfo = await loop.run_in_executor(
22
- None,
23
- partial(google_id_token.verify_oauth2_token, token, google_requests.Request(), client_id)
24
- )
25
- if idinfo.get('iss') not in ['accounts.google.com', 'https://accounts.google.com']:
26
- raise ValueError('Wrong issuer.')
27
- return idinfo
28
- except Exception as e:
29
- raise ValueError(f"Invalid Google token: {str(e)}")
30
 
31
-
32
- # ✅ Async Apple token verification
33
  async def verify_apple_token(token: str, audience: str) -> Dict:
34
  """
35
  Asynchronously verifies an Apple identity token and returns the decoded payload.
36
  """
37
- try:
38
- # Fetch Apple's public keys (JWKS)
39
- async with httpx.AsyncClient() as client:
40
- response = await client.get('https://appleid.apple.com/auth/keys')
41
- response.raise_for_status()
42
- keys = response.json().get('keys', [])
43
-
44
- # Decode header to get kid and alg
45
- header = jose_jwt.get_unverified_header(token)
46
- kid = header.get('kid')
47
- alg = header.get('alg')
48
-
49
- key = next((k for k in keys if k['kid'] == kid and k['alg'] == alg), None)
50
- if not key:
51
- raise ValueError("Public key not found for Apple token")
52
 
53
- public_key = jwk.construct(key)
54
- message, encoded_sig = token.rsplit('.', 1)
55
- decoded_sig = base64url_decode(encoded_sig.encode())
56
-
57
- if not public_key.verify(message.encode(), decoded_sig):
58
- raise ValueError("Invalid Apple token signature")
59
-
60
- claims = jose_jwt.decode(
61
- token,
62
- key,
63
- algorithms=['RS256'],
64
- audience=audience,
65
- issuer='https://appleid.apple.com'
66
- )
67
- return claims
 
 
 
 
68
 
69
- except Exception as e:
70
- raise ValueError(f"Invalid Apple token: {str(e)}")
 
2
  from google.auth.transport import requests as google_requests
3
  from jose import jwt as jose_jwt, JWTError, jwk
4
  from jose.utils import base64url_decode
5
+ from typing import Dict, Optional
6
  import httpx
 
 
 
7
  import asyncio
8
+ from functools import partial, lru_cache
9
+ import logging
10
+ from datetime import datetime, timedelta
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ class TokenVerificationError(Exception):
15
+ """Custom exception for token verification errors"""
16
+ pass
17
+
18
+ class GoogleTokenVerifier:
19
+ def __init__(self, client_id: str):
20
+ self.client_id = client_id
21
+
22
+ async def verify_token(self, token: str) -> Dict:
23
+ """
24
+ Asynchronously verifies a Google ID token and returns the payload if valid.
25
+ """
26
+ try:
27
+ loop = asyncio.get_event_loop()
28
+ # Run the sync method in a thread to avoid blocking
29
+ idinfo = await loop.run_in_executor(
30
+ None,
31
+ partial(google_id_token.verify_oauth2_token, token, google_requests.Request(), self.client_id)
32
+ )
33
+
34
+ # Validate issuer
35
+ if idinfo.get('iss') not in ['accounts.google.com', 'https://accounts.google.com']:
36
+ raise TokenVerificationError('Invalid issuer')
37
+
38
+ # Additional validation
39
+ if idinfo.get('aud') != self.client_id:
40
+ raise TokenVerificationError('Invalid audience')
41
+
42
+ # Check token expiration (extra safety)
43
+ exp = idinfo.get('exp')
44
+ if exp and datetime.fromtimestamp(exp) < datetime.utcnow():
45
+ raise TokenVerificationError('Token has expired')
46
+
47
+ logger.info(f"Successfully verified Google token for user: {idinfo.get('email')}")
48
+ return idinfo
49
+
50
+ except Exception as e:
51
+ logger.error(f"Google token verification failed: {str(e)}")
52
+ raise TokenVerificationError(f"Invalid Google token: {str(e)}")
53
 
54
+ class AppleTokenVerifier:
55
+ def __init__(self, audience: str, cache_duration: int = 3600):
56
+ self.audience = audience
57
+ self.cache_duration = cache_duration
58
+ self._keys_cache = None
59
+ self._cache_timestamp = None
60
+
61
+ async def _get_apple_keys(self) -> list:
62
+ """
63
+ Fetch Apple's public keys with caching to reduce API calls.
64
+ """
65
+ now = datetime.utcnow()
66
+
67
+ # Check if cache is still valid
68
+ if (self._keys_cache and self._cache_timestamp and
69
+ now - self._cache_timestamp < timedelta(seconds=self.cache_duration)):
70
+ return self._keys_cache
71
+
72
+ try:
73
+ async with httpx.AsyncClient(timeout=10.0) as client:
74
+ response = await client.get('https://appleid.apple.com/auth/keys')
75
+ response.raise_for_status()
76
+ keys = response.json().get('keys', [])
77
+
78
+ # Update cache
79
+ self._keys_cache = keys
80
+ self._cache_timestamp = now
81
+
82
+ logger.info("Successfully fetched Apple public keys")
83
+ return keys
84
+
85
+ except httpx.RequestError as e:
86
+ logger.error(f"Failed to fetch Apple keys: {str(e)}")
87
+ raise TokenVerificationError(f"Could not fetch Apple public keys: {str(e)}")
88
+
89
+ async def verify_token(self, token: str) -> Dict:
90
+ """
91
+ Asynchronously verifies an Apple identity token and returns the decoded payload.
92
+ """
93
+ try:
94
+ # Fetch Apple's public keys
95
+ keys = await self._get_apple_keys()
96
+
97
+ # Decode header to get kid and alg
98
+ header = jose_jwt.get_unverified_header(token)
99
+ kid = header.get('kid')
100
+ alg = header.get('alg')
101
+
102
+ if not kid or not alg:
103
+ raise TokenVerificationError("Token header missing required fields")
104
+
105
+ # Find matching key
106
+ key = next((k for k in keys if k['kid'] == kid and k['alg'] == alg), None)
107
+ if not key:
108
+ raise TokenVerificationError("Public key not found for Apple token")
109
+
110
+ # Verify signature manually (additional safety)
111
+ public_key = jwk.construct(key)
112
+ message, encoded_sig = token.rsplit('.', 1)
113
+ decoded_sig = base64url_decode(encoded_sig.encode())
114
+
115
+ if not public_key.verify(message.encode(), decoded_sig):
116
+ raise TokenVerificationError("Invalid Apple token signature")
117
+
118
+ # Decode and validate claims
119
+ claims = jose_jwt.decode(
120
+ token,
121
+ key,
122
+ algorithms=['RS256'],
123
+ audience=self.audience,
124
+ issuer='https://appleid.apple.com'
125
+ )
126
+
127
+ # Additional validation
128
+ if claims.get('aud') != self.audience:
129
+ raise TokenVerificationError('Invalid audience')
130
+
131
+ logger.info(f"Successfully verified Apple token for user: {claims.get('sub')}")
132
+ return claims
133
+
134
+ except JWTError as e:
135
+ logger.error(f"JWT error during Apple token verification: {str(e)}")
136
+ raise TokenVerificationError(f"Invalid Apple token: {str(e)}")
137
+ except Exception as e:
138
+ logger.error(f"Apple token verification failed: {str(e)}")
139
+ raise TokenVerificationError(f"Invalid Apple token: {str(e)}")
140
+
141
+ # Factory class for easier usage
142
+ class OAuthVerifier:
143
+ def __init__(self, google_client_id: Optional[str] = None, apple_audience: Optional[str] = None):
144
+ self.google_verifier = GoogleTokenVerifier(google_client_id) if google_client_id else None
145
+ self.apple_verifier = AppleTokenVerifier(apple_audience) if apple_audience else None
146
+
147
+ async def verify_google_token(self, token: str) -> Dict:
148
+ if not self.google_verifier:
149
+ raise TokenVerificationError("Google verifier not configured")
150
+ return await self.google_verifier.verify_token(token)
151
+
152
+ async def verify_apple_token(self, token: str) -> Dict:
153
+ if not self.apple_verifier:
154
+ raise TokenVerificationError("Apple verifier not configured")
155
+ return await self.apple_verifier.verify_token(token)
156
+
157
+ # Convenience functions (backward compatibility)
158
  async def verify_google_token(token: str, client_id: str) -> Dict:
159
  """
160
  Asynchronously verifies a Google ID token and returns the payload if valid.
161
  """
162
+ verifier = GoogleTokenVerifier(client_id)
163
+ return await verifier.verify_token(token)
 
 
 
 
 
 
 
 
 
 
164
 
 
 
165
  async def verify_apple_token(token: str, audience: str) -> Dict:
166
  """
167
  Asynchronously verifies an Apple identity token and returns the decoded payload.
168
  """
169
+ verifier = AppleTokenVerifier(audience)
170
+ return await verifier.verify_token(token)
 
 
 
 
 
 
 
 
 
 
 
 
 
171
 
172
+ # Example usage
173
+ async def example_usage():
174
+ # Initialize verifier
175
+ oauth_verifier = OAuthVerifier(
176
+ google_client_id="your-google-client-id.googleusercontent.com",
177
+ apple_audience="your.app.bundle.id"
178
+ )
179
+
180
+ try:
181
+ # Verify Google token
182
+ google_claims = await oauth_verifier.verify_google_token("google_id_token_here")
183
+ print(f"Google user: {google_claims.get('email')}")
184
+
185
+ # Verify Apple token
186
+ apple_claims = await oauth_verifier.verify_apple_token("apple_id_token_here")
187
+ print(f"Apple user: {apple_claims.get('sub')}")
188
+
189
+ except TokenVerificationError as e:
190
+ print(f"Verification failed: {e}")
191
 
192
+ if __name__ == "__main__":
193
+ asyncio.run(example_usage())