""" OAuth Token Persistence for Hugging Face Spaces Stores refresh tokens in Google Cloud Secret Manager programmatically """ import logging from typing import Optional, Dict, Any import json import os try: from google.cloud import secretmanager from google.oauth2 import service_account except ImportError: secretmanager = None service_account = None logger = logging.getLogger(__name__) class OAuthTokenManager: """Manages OAuth tokens with Secret Manager persistence""" def __init__(self): self.project_id = os.getenv('GOOGLE_CLOUD_PROJECT_ID', 'chatcal-voice') self.secret_name = "oauth-refresh-tokens" self.client = None # Initialize Secret Manager client self._init_secret_manager() def _init_secret_manager(self): """Initialize Google Cloud Secret Manager client""" try: if secretmanager is None: logger.warning("google-cloud-secret-manager not available") return # Try to initialize with default credentials or service account self.client = secretmanager.SecretManagerServiceClient() logger.info("✅ Secret Manager client initialized") except Exception as e: logger.warning(f"❌ Failed to initialize Secret Manager: {e}") async def store_refresh_token(self, user_email: str, refresh_token: str) -> bool: """Store refresh token in Secret Manager""" if not self.client: logger.warning("Secret Manager not available, using fallback storage") return self._store_fallback(user_email, refresh_token) try: # Get existing tokens existing_tokens = await self.get_all_tokens() # Update with new token existing_tokens[user_email] = { "refresh_token": refresh_token, "stored_at": self._get_timestamp() } # Store back to Secret Manager secret_value = json.dumps(existing_tokens) parent = f"projects/{self.project_id}" secret_id = self.secret_name # Create secret if it doesn't exist try: self.client.create_secret( request={ "parent": parent, "secret_id": secret_id, "secret": {"replication": {"automatic": {}}}, } ) logger.info(f"Created new secret: {secret_id}") except Exception: # Secret already exists pass # Add new version self.client.add_secret_version( request={ "parent": f"{parent}/secrets/{secret_id}", "payload": {"data": secret_value.encode("UTF-8")}, } ) logger.info(f"✅ Stored refresh token for {user_email}") return True except Exception as e: logger.error(f"❌ Failed to store refresh token: {e}") return self._store_fallback(user_email, refresh_token) async def get_refresh_token(self, user_email: str) -> Optional[str]: """Retrieve refresh token from Secret Manager""" if not self.client: return self._get_fallback(user_email) try: secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest" response = self.client.access_secret_version(request={"name": secret_path}) secret_value = response.payload.data.decode("UTF-8") tokens = json.loads(secret_value) user_data = tokens.get(user_email, {}) refresh_token = user_data.get("refresh_token") if refresh_token: logger.info(f"✅ Retrieved refresh token for {user_email}") return refresh_token else: logger.warning(f"⚠️ No refresh token found for {user_email}") return None except Exception as e: logger.error(f"❌ Failed to retrieve refresh token: {e}") return self._get_fallback(user_email) async def get_all_tokens(self) -> Dict[str, Any]: """Get all stored tokens""" if not self.client: return {} try: secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest" response = self.client.access_secret_version(request={"name": secret_path}) secret_value = response.payload.data.decode("UTF-8") return json.loads(secret_value) except Exception: return {} def _store_fallback(self, user_email: str, refresh_token: str) -> bool: """Fallback storage using environment variables (not persistent)""" try: # Store in environment for current session only os.environ[f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}"] = refresh_token logger.warning(f"⚠️ Using fallback storage for {user_email} (not persistent)") return True except Exception as e: logger.error(f"❌ Fallback storage failed: {e}") return False def _get_fallback(self, user_email: str) -> Optional[str]: """Fallback retrieval from environment variables""" env_key = f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}" token = os.getenv(env_key) if token: logger.warning(f"⚠️ Using fallback token for {user_email}") return token def _get_timestamp(self) -> str: """Get current timestamp""" from datetime import datetime return datetime.utcnow().isoformat() # Global instance oauth_manager = OAuthTokenManager() # Usage example for integration: async def save_oauth_token_after_auth(user_email: str, credentials): """Call this after successful OAuth flow""" if hasattr(credentials, 'refresh_token') and credentials.refresh_token: success = await oauth_manager.store_refresh_token(user_email, credentials.refresh_token) if success: logger.info(f"OAuth token saved for {user_email}") else: logger.error(f"Failed to save OAuth token for {user_email}") async def load_oauth_token_on_startup(user_email: str): """Call this on app startup to restore tokens""" refresh_token = await oauth_manager.get_refresh_token(user_email) if refresh_token: logger.info(f"OAuth token restored for {user_email}") return refresh_token else: logger.warning(f"No stored OAuth token for {user_email}") return None