| """ |
| OAuth Token Persistence for Hugging Face Spaces |
| Stores refresh tokens in Google Cloud Secret Manager programmatically |
| """ |
|
|
| import logging |
| from typing import Optional, Dict, Any |
| import json |
| import os |
|
|
| try: |
| from google.cloud import secretmanager |
| from google.oauth2 import service_account |
| except ImportError: |
| secretmanager = None |
| service_account = None |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class OAuthTokenManager: |
| """Manages OAuth tokens with Secret Manager persistence""" |
| |
| def __init__(self): |
| self.project_id = os.getenv('GOOGLE_CLOUD_PROJECT_ID', 'chatcal-voice') |
| self.secret_name = "oauth-refresh-tokens" |
| self.client = None |
| |
| |
| self._init_secret_manager() |
| |
| def _init_secret_manager(self): |
| """Initialize Google Cloud Secret Manager client""" |
| try: |
| if secretmanager is None: |
| logger.warning("google-cloud-secret-manager not available") |
| return |
| |
| |
| self.client = secretmanager.SecretManagerServiceClient() |
| logger.info("β
Secret Manager client initialized") |
| |
| except Exception as e: |
| logger.warning(f"β Failed to initialize Secret Manager: {e}") |
| |
| async def store_refresh_token(self, user_email: str, refresh_token: str) -> bool: |
| """Store refresh token in Secret Manager""" |
| if not self.client: |
| logger.warning("Secret Manager not available, using fallback storage") |
| return self._store_fallback(user_email, refresh_token) |
| |
| try: |
| |
| existing_tokens = await self.get_all_tokens() |
| |
| |
| existing_tokens[user_email] = { |
| "refresh_token": refresh_token, |
| "stored_at": self._get_timestamp() |
| } |
| |
| |
| secret_value = json.dumps(existing_tokens) |
| parent = f"projects/{self.project_id}" |
| secret_id = self.secret_name |
| |
| |
| try: |
| self.client.create_secret( |
| request={ |
| "parent": parent, |
| "secret_id": secret_id, |
| "secret": {"replication": {"automatic": {}}}, |
| } |
| ) |
| logger.info(f"Created new secret: {secret_id}") |
| except Exception: |
| |
| pass |
| |
| |
| self.client.add_secret_version( |
| request={ |
| "parent": f"{parent}/secrets/{secret_id}", |
| "payload": {"data": secret_value.encode("UTF-8")}, |
| } |
| ) |
| |
| logger.info(f"β
Stored refresh token for {user_email}") |
| return True |
| |
| except Exception as e: |
| logger.error(f"β Failed to store refresh token: {e}") |
| return self._store_fallback(user_email, refresh_token) |
| |
| async def get_refresh_token(self, user_email: str) -> Optional[str]: |
| """Retrieve refresh token from Secret Manager""" |
| if not self.client: |
| return self._get_fallback(user_email) |
| |
| try: |
| secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest" |
| response = self.client.access_secret_version(request={"name": secret_path}) |
| |
| secret_value = response.payload.data.decode("UTF-8") |
| tokens = json.loads(secret_value) |
| |
| user_data = tokens.get(user_email, {}) |
| refresh_token = user_data.get("refresh_token") |
| |
| if refresh_token: |
| logger.info(f"β
Retrieved refresh token for {user_email}") |
| return refresh_token |
| else: |
| logger.warning(f"β οΈ No refresh token found for {user_email}") |
| return None |
| |
| except Exception as e: |
| logger.error(f"β Failed to retrieve refresh token: {e}") |
| return self._get_fallback(user_email) |
| |
| async def get_all_tokens(self) -> Dict[str, Any]: |
| """Get all stored tokens""" |
| if not self.client: |
| return {} |
| |
| try: |
| secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest" |
| response = self.client.access_secret_version(request={"name": secret_path}) |
| |
| secret_value = response.payload.data.decode("UTF-8") |
| return json.loads(secret_value) |
| |
| except Exception: |
| return {} |
| |
| def _store_fallback(self, user_email: str, refresh_token: str) -> bool: |
| """Fallback storage using environment variables (not persistent)""" |
| try: |
| |
| os.environ[f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}"] = refresh_token |
| logger.warning(f"β οΈ Using fallback storage for {user_email} (not persistent)") |
| return True |
| except Exception as e: |
| logger.error(f"β Fallback storage failed: {e}") |
| return False |
| |
| def _get_fallback(self, user_email: str) -> Optional[str]: |
| """Fallback retrieval from environment variables""" |
| env_key = f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}" |
| token = os.getenv(env_key) |
| if token: |
| logger.warning(f"β οΈ Using fallback token for {user_email}") |
| return token |
| |
| def _get_timestamp(self) -> str: |
| """Get current timestamp""" |
| from datetime import datetime |
| return datetime.utcnow().isoformat() |
|
|
|
|
| |
| oauth_manager = OAuthTokenManager() |
|
|
|
|
| |
| async def save_oauth_token_after_auth(user_email: str, credentials): |
| """Call this after successful OAuth flow""" |
| if hasattr(credentials, 'refresh_token') and credentials.refresh_token: |
| success = await oauth_manager.store_refresh_token(user_email, credentials.refresh_token) |
| if success: |
| logger.info(f"OAuth token saved for {user_email}") |
| else: |
| logger.error(f"Failed to save OAuth token for {user_email}") |
|
|
|
|
| async def load_oauth_token_on_startup(user_email: str): |
| """Call this on app startup to restore tokens""" |
| refresh_token = await oauth_manager.get_refresh_token(user_email) |
| if refresh_token: |
| logger.info(f"OAuth token restored for {user_email}") |
| return refresh_token |
| else: |
| logger.warning(f"No stored OAuth token for {user_email}") |
| return None |